Merge pull request #50 from martinsumner/mas-sstbinarytrim-i42

Mas sstbinarytrim i42
This commit is contained in:
Martin Sumner 2017-03-20 10:28:55 +00:00 committed by GitHub
commit 0cdc0eb558

View file

@ -196,12 +196,23 @@ sst_get(Pid, LedgerKey, Hash) ->
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity). gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
gen_fsm:sync_send_event(Pid, Reply = gen_fsm:sync_send_event(Pid,
{get_kvrange, StartKey, EndKey, ScanWidth}, {get_kvrange, StartKey, EndKey, ScanWidth},
infinity). infinity),
FetchFun =
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
end,
{SlotsToFetchBinList, SlotsToPoint} = Reply,
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint.
sst_getslots(Pid, SlotList) -> sst_getslots(Pid, SlotList) ->
gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity). SlotBins = gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity),
FetchFun =
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
end,
lists:foldl(FetchFun, [], SlotBins).
sst_getmaxsequencenumber(Pid) -> sst_getmaxsequencenumber(Pid) ->
gen_fsm:sync_send_event(Pid, get_maxsequencenumber, infinity). gen_fsm:sync_send_event(Pid, get_maxsequencenumber, infinity).
@ -310,11 +321,7 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
State}; State};
reader({get_slots, SlotList}, _From, State) -> reader({get_slots, SlotList}, _From, State) ->
SlotBins = read_slots(State#state.handle, SlotList), SlotBins = read_slots(State#state.handle, SlotList),
FetchFun = {reply, SlotBins, reader, State};
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
end,
{reply, lists:foldl(FetchFun, [], SlotBins), reader, State};
reader(get_maxsequencenumber, _From, State) -> reader(get_maxsequencenumber, _From, State) ->
Summary = State#state.summary, Summary = State#state.summary,
{reply, Summary#summary.max_sqn, reader, State}; {reply, Summary#summary.max_sqn, reader, State};
@ -353,15 +360,7 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
?DELETE_TIMEOUT}; ?DELETE_TIMEOUT};
delete_pending({get_slots, SlotList}, _From, State) -> delete_pending({get_slots, SlotList}, _From, State) ->
SlotBins = read_slots(State#state.handle, SlotList), SlotBins = read_slots(State#state.handle, SlotList),
FetchFun = {reply, SlotBins, delete_pending, State, ?DELETE_TIMEOUT};
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
end,
{reply,
lists:foldl(FetchFun, [], SlotBins),
delete_pending,
State,
?DELETE_TIMEOUT};
delete_pending(close, _From, State) -> delete_pending(close, _From, State) ->
leveled_log:log("SST07", [State#state.filename]), leveled_log:log("SST07", [State#state.filename]),
ok = file:close(State#state.handle), ok = file:close(State#state.handle),
@ -500,12 +499,7 @@ fetch_range(StartKey, EndKey, ScanWidth, State) ->
end, end,
SlotsToFetchBinList = read_slots(Handle, SlotsToFetch), SlotsToFetchBinList = read_slots(Handle, SlotsToFetch),
{SlotsToFetchBinList, SlotsToPoint}.
FetchFun =
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
end,
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint.
write_file(RootPath, Filename, SummaryBin, SlotsBin) -> write_file(RootPath, Filename, SummaryBin, SlotsBin) ->
@ -915,41 +909,34 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey) ->
LTrimFun = fun({K, _V}) -> K < StartKey end, LTrimFun = fun({K, _V}) -> K < StartKey end,
RTrimFun = fun({K, _V}) -> not leveled_codec:endkey_passed(EndKey, K) end, RTrimFun = fun({K, _V}) -> not leveled_codec:endkey_passed(EndKey, K) end,
BlockFetchFun = BlockFetchFun =
fun(Length, {Acc, Bin}) -> fun(Length, {Acc, Bin, Continue}) ->
case Length of case {Length, Continue} of
0 -> {0, _} ->
{Acc, Bin}; {Acc, Bin, false};
_ -> {_, true} ->
<<Block:Length/binary, Rest/binary>> = Bin, <<Block:Length/binary, Rest/binary>> = Bin,
BlockList = binary_to_term(Block), BlockList = binary_to_term(Block),
{FirstKey, _FV} = lists:nth(1, BlockList),
{LastKey, _LV} = lists:last(BlockList), {LastKey, _LV} = lists:last(BlockList),
TrimBools = trim_booleans(FirstKey, LastKey, case StartKey > LastKey of
StartKey, EndKey), true ->
case TrimBools of {Acc, Rest, true};
{true, _, _, _} -> false ->
{Acc, Rest};
{false, true, _, _} ->
{Acc ++ BlockList, Rest};
{false, false, true, false} ->
{_LDrop, RKeep} = lists:splitwith(LTrimFun, {_LDrop, RKeep} = lists:splitwith(LTrimFun,
BlockList), BlockList),
{Acc ++ RKeep, Rest}; case leveled_codec:endkey_passed(EndKey, LastKey) of
{false, false, false, true} -> true ->
{LKeep, _RDrop} = lists:splitwith(RTrimFun, {LKeep, _RDrop} = lists:splitwith(RTrimFun, RKeep),
BlockList), {Acc ++ LKeep, Rest, false};
{Acc ++ LKeep, Rest}; false ->
{false, false, true, true} -> {Acc ++ RKeep, Rest, true}
{_LDrop, RKeep} = lists:splitwith(LTrimFun, end
BlockList), end;
{LKeep, _RDrop} = lists:splitwith(RTrimFun, RKeep), {_ , false} ->
{Acc ++ LKeep, Rest} {Acc, Bin, false}
end
end end
end, end,
{Out, _Rem} = {Out, _Rem, _Continue} =
case crc_check_slot(FullBin) of case crc_check_slot(FullBin) of
{BlockLengths, RestBin} -> {BlockLengths, RestBin} ->
<<B1P:32/integer, <<B1P:32/integer,
@ -958,51 +945,13 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey) ->
B3L:32/integer, B3L:32/integer,
B4L:32/integer>> = BlockLengths, B4L:32/integer>> = BlockLengths,
<<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin, <<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin,
lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L]); lists:foldl(BlockFetchFun, {[], Blocks, true}, [B1L, B2L, B3L, B4L]);
crc_wonky -> crc_wonky ->
{[], <<>>} {[], <<>>, true}
end, end,
Out. Out.
trim_booleans(FirstKey, _LastKey, StartKey, all) ->
FirstKeyPassedStart = FirstKey > StartKey,
case FirstKeyPassedStart of
true ->
{false, true, false, false};
false ->
{false, false, true, false}
end;
trim_booleans(_FirstKey, LastKey, all, EndKey) ->
LastKeyPassedEnd = leveled_codec:endkey_passed(EndKey, LastKey),
case LastKeyPassedEnd of
true ->
{false, false, false, true};
false ->
{false, true, false, false}
end;
trim_booleans(FirstKey, LastKey, StartKey, EndKey) ->
FirstKeyPassedStart = FirstKey > StartKey,
PreRange = LastKey < StartKey,
PostRange = leveled_codec:endkey_passed(EndKey, FirstKey),
OutOfRange = PreRange or PostRange,
LastKeyPassedEnd = leveled_codec:endkey_passed(EndKey, LastKey),
case OutOfRange of
true ->
{true, false, false, false};
false ->
case {FirstKeyPassedStart, LastKeyPassedEnd} of
{true, false} ->
{false, true, false, false};
{false, false} ->
{false, false, true, false};
{true, true} ->
{false, false, false, true};
{false, true} ->
{false, false, true, true}
end
end.
crc_check_slot(FullBin) -> crc_check_slot(FullBin) ->
<<CRC32:32/integer, SlotBin/binary>> = FullBin, <<CRC32:32/integer, SlotBin/binary>> = FullBin,
@ -1162,15 +1111,6 @@ form_slot(KVList1, KVList2, _LI, no_lookup, ?NOLOOK_SLOTSIZE, Slot, FK) ->
{KVList1, KVList2, {no_lookup, lists:reverse(Slot)}, FK}; {KVList1, KVList2, {no_lookup, lists:reverse(Slot)}, FK};
form_slot(KVList1, KVList2, {IsBasement, TS}, lookup, Size, Slot, FK) -> form_slot(KVList1, KVList2, {IsBasement, TS}, lookup, Size, Slot, FK) ->
case {key_dominates(KVList1, KVList2, {IsBasement, TS}), FK} of case {key_dominates(KVList1, KVList2, {IsBasement, TS}), FK} of
{{{next_key, TopKV}, Rem1, Rem2}, null} ->
{TopK, _TopV} = TopKV,
form_slot(Rem1,
Rem2,
{IsBasement, TS},
lookup,
Size + 1,
[TopKV|Slot],
TopK);
{{{next_key, TopKV}, Rem1, Rem2}, _} -> {{{next_key, TopKV}, Rem1, Rem2}, _} ->
form_slot(Rem1, form_slot(Rem1,
Rem2, Rem2,
@ -1716,7 +1656,24 @@ additional_range_test() ->
% R8 = sst_getkvrange(P1, element(1, PastEKV), element(1, PastEKV), 2), % R8 = sst_getkvrange(P1, element(1, PastEKV), element(1, PastEKV), 2),
% ?assertMatch([], R8). % ?assertMatch([], R8).
simple_persisted_slotsize_test() ->
{RP, Filename} = {"../test/", "simple_slotsize_test"},
KVList0 = generate_randomkeys(1, ?SLOT_SIZE * 2, 1, 20),
KVList1 = lists:sublist(lists:ukeysort(1, KVList0), ?SLOT_SIZE),
[{FirstKey, _FV}|_Rest] = KVList1,
{LastKey, _LV} = lists:last(KVList1),
{ok, Pid, {FirstKey, LastKey}} = sst_new(RP,
Filename,
1,
KVList1,
length(KVList1)),
lists:foreach(fun({K, V}) ->
?assertMatch({K, V}, sst_get(Pid, K))
end,
KVList1),
ok = sst_close(Pid),
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
simple_persisted_test() -> simple_persisted_test() ->
{RP, Filename} = {"../test/", "simple_test"}, {RP, Filename} = {"../test/", "simple_test"},