Catch SSU packet read errors in one place

IMS PartialBitfield tweaks
Log tweaks
This commit is contained in:
zzz
2014-09-20 12:26:45 +00:00
parent 9e7e2948e3
commit 9c4558d891
4 changed files with 67 additions and 43 deletions

View File

@ -74,6 +74,21 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
* Pull the fragments and ACKs out of the authenticated data packet * Pull the fragments and ACKs out of the authenticated data packet
*/ */
public void receiveData(PeerState from, UDPPacketReader.DataReader data) { public void receiveData(PeerState from, UDPPacketReader.DataReader data) {
try {
rcvData(from, data);
} catch (DataFormatException dfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Bad pkt from: " + from, dfe);
} catch (IndexOutOfBoundsException ioobe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Bad pkt from: " + from, ioobe);
}
}
/**
* Pull the fragments and ACKs out of the authenticated data packet
*/
private void rcvData(PeerState from, UDPPacketReader.DataReader data) throws DataFormatException {
//long beforeMsgs = _context.clock().now(); //long beforeMsgs = _context.clock().now();
int fragmentsIncluded = receiveMessages(from, data); int fragmentsIncluded = receiveMessages(from, data);
//long afterMsgs = _context.clock().now(); //long afterMsgs = _context.clock().now();
@ -95,7 +110,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
* *
* @return number of data fragments included * @return number of data fragments included
*/ */
private int receiveMessages(PeerState from, UDPPacketReader.DataReader data) { private int receiveMessages(PeerState from, UDPPacketReader.DataReader data) throws DataFormatException {
int fragments = data.readFragmentCount(); int fragments = data.readFragmentCount();
if (fragments <= 0) return fragments; if (fragments <= 0) return fragments;
Hash fromPeer = from.getRemotePeer(); Hash fromPeer = from.getRemotePeer();
@ -132,11 +147,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
boolean isNew = false; boolean isNew = false;
state = messages.get(messageId); state = messages.get(messageId);
if (state == null) { if (state == null) {
try { state = new InboundMessageState(_context, mid, fromPeer, data, i);
state = new InboundMessageState(_context, mid, fromPeer, data, i);
} catch (DataFormatException dfe) {
break;
}
isNew = true; isNew = true;
fragmentOK = true; fragmentOK = true;
// we will add to messages shortly if it isn't complete // we will add to messages shortly if it isn't complete
@ -199,7 +210,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
/** /**
* @return the number of bitfields in the ack? why? * @return the number of bitfields in the ack? why?
*/ */
private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) { private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) throws DataFormatException {
int rv = 0; int rv = 0;
boolean newAck = false; boolean newAck = false;
if (data.readACKsIncluded()) { if (data.readACKsIncluded()) {

View File

@ -89,7 +89,7 @@ class InboundMessageState implements CDQEntry {
* *
* @return true if the data was ok, false if it was corrupt * @return true if the data was ok, false if it was corrupt
*/ */
public boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) { public boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) throws DataFormatException {
int fragmentNum = data.readMessageFragmentNum(dataFragment); int fragmentNum = data.readMessageFragmentNum(dataFragment);
if ( (fragmentNum < 0) || (fragmentNum >= _fragments.length)) { if ( (fragmentNum < 0) || (fragmentNum >= _fragments.length)) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -233,7 +233,6 @@ class InboundMessageState implements CDQEntry {
*/ */
private static final class PartialBitfield implements ACKBitfield { private static final class PartialBitfield implements ACKBitfield {
private final long _bitfieldMessageId; private final long _bitfieldMessageId;
private final int _fragmentCount;
private final int _ackCount; private final int _ackCount;
private final int _highestReceived; private final int _highestReceived;
// bitfield, 1 for acked // bitfield, 1 for acked
@ -241,7 +240,7 @@ class InboundMessageState implements CDQEntry {
/** /**
* @param data each element is non-null or null for received or not * @param data each element is non-null or null for received or not
* @param lastFragment size of data to use * @param size size of data to use
*/ */
public PartialBitfield(long messageId, Object data[], int size) { public PartialBitfield(long messageId, Object data[], int size) {
if (size > MAX_FRAGMENTS) if (size > MAX_FRAGMENTS)
@ -258,7 +257,6 @@ class InboundMessageState implements CDQEntry {
} }
} }
_fragmentAcks = acks; _fragmentAcks = acks;
_fragmentCount = size;
_ackCount = ackCount; _ackCount = ackCount;
_highestReceived = highestReceived; _highestReceived = highestReceived;
} }
@ -270,7 +268,7 @@ class InboundMessageState implements CDQEntry {
return 1L << fragment; return 1L << fragment;
} }
public int fragmentCount() { return _fragmentCount; } public int fragmentCount() { return _highestReceived + 1; }
public int ackCount() { return _ackCount; } public int ackCount() { return _ackCount; }
@ -279,12 +277,12 @@ class InboundMessageState implements CDQEntry {
public long getMessageId() { return _bitfieldMessageId; } public long getMessageId() { return _bitfieldMessageId; }
public boolean received(int fragmentNum) { public boolean received(int fragmentNum) {
if (fragmentNum < 0 || fragmentNum >= _fragmentCount) if (fragmentNum < 0 || fragmentNum > _highestReceived)
return false; return false;
return (_fragmentAcks & mask(fragmentNum)) != 0; return (_fragmentAcks & mask(fragmentNum)) != 0;
} }
public boolean receivedComplete() { return _ackCount == _fragmentCount; } public boolean receivedComplete() { return _ackCount == _highestReceived + 1; }
@Override @Override
public String toString() { public String toString() {
@ -293,11 +291,11 @@ class InboundMessageState implements CDQEntry {
buf.append(_bitfieldMessageId); buf.append(_bitfieldMessageId);
buf.append(" highest: ").append(_highestReceived); buf.append(" highest: ").append(_highestReceived);
buf.append(" with ").append(_ackCount).append(" ACKs for: ["); buf.append(" with ").append(_ackCount).append(" ACKs for: [");
for (int i = 0; i < _fragmentCount; i++) { for (int i = 0; i <= _highestReceived; i++) {
if (received(i)) if (received(i))
buf.append(i).append(' '); buf.append(i).append(' ');
} }
buf.append("] / ").append(_fragmentCount); buf.append("] / ").append(_highestReceived + 1);
return buf.toString(); return buf.toString();
} }
} }

View File

@ -8,6 +8,7 @@ import java.util.concurrent.BlockingQueue;
import net.i2p.router.Router; import net.i2p.router.Router;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.util.CoDelBlockingQueue; import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
import net.i2p.util.LHMCache; import net.i2p.util.LHMCache;
@ -691,33 +692,35 @@ class PacketHandler {
state = _establisher.receiveData(outState); state = _establisher.receiveData(outState);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received new DATA packet from " + state + ": " + packet); _log.debug("Received new DATA packet from " + state + ": " + packet);
UDPPacketReader.DataReader dr = reader.getDataReader();
if (state != null) { if (state != null) {
UDPPacketReader.DataReader dr = reader.getDataReader();
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
StringBuilder msg = new StringBuilder(512); StringBuilder msg = new StringBuilder(512);
msg.append("Receive ").append(System.identityHashCode(packet)); msg.append("Receive ").append(System.identityHashCode(packet));
msg.append(" from ").append(state.getRemotePeer().toBase64()).append(" ").append(state.getRemoteHostId()); msg.append(" from ").append(state.getRemotePeer().toBase64()).append(" ").append(state.getRemoteHostId());
for (int i = 0; i < dr.readFragmentCount(); i++) { try {
msg.append(" msg ").append(dr.readMessageId(i)); int count = dr.readFragmentCount();
msg.append(":").append(dr.readMessageFragmentNum(i)); for (int i = 0; i < count; i++) {
if (dr.readMessageIsLast(i)) msg.append(" msg ").append(dr.readMessageId(i));
msg.append("*"); msg.append(":").append(dr.readMessageFragmentNum(i));
} if (dr.readMessageIsLast(i))
msg.append("*");
}
} catch (DataFormatException dfe) {}
msg.append(": ").append(dr.toString()); msg.append(": ").append(dr.toString());
_log.debug(msg.toString()); _log.debug(msg.toString());
} }
//packet.beforeReceiveFragments(); //packet.beforeReceiveFragments();
_inbound.receiveData(state, dr); _inbound.receiveData(state, dr);
_context.statManager().addRateData("udp.receivePacketSize.dataKnown", packet.getPacket().getLength(), packet.getLifetime()); _context.statManager().addRateData("udp.receivePacketSize.dataKnown", packet.getPacket().getLength(), packet.getLifetime());
if (dr.readFragmentCount() <= 0)
_context.statManager().addRateData("udp.receivePacketSize.dataKnownAck", packet.getPacket().getLength(), packet.getLifetime());
} else { } else {
// doesn't happen // doesn't happen
_context.statManager().addRateData("udp.receivePacketSize.dataUnknown", packet.getPacket().getLength(), packet.getLifetime()); _context.statManager().addRateData("udp.receivePacketSize.dataUnknown", packet.getPacket().getLength(), packet.getLifetime());
UDPPacketReader.DataReader dr = reader.getDataReader(); }
try {
if (dr.readFragmentCount() <= 0) if (dr.readFragmentCount() <= 0)
_context.statManager().addRateData("udp.receivePacketSize.dataUnknownAck", packet.getPacket().getLength(), packet.getLifetime()); _context.statManager().addRateData("udp.receivePacketSize.dataUnknownAck", packet.getPacket().getLength(), packet.getLifetime());
} } catch (DataFormatException dfe) {}
break; break;
case UDPPacket.PAYLOAD_TYPE_TEST: case UDPPacket.PAYLOAD_TYPE_TEST:
_state = 51; _state = 51;

View File

@ -2,6 +2,7 @@ package net.i2p.router.transport.udp;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.Base64; import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
import net.i2p.data.Signature; import net.i2p.data.Signature;
@ -12,6 +13,9 @@ import net.i2p.util.Log;
* the appropriate fields. If the interesting bits are in message specific * the appropriate fields. If the interesting bits are in message specific
* elements, grab the appropriate subreader. * elements, grab the appropriate subreader.
* *
* Many of the methods here and in the subclasses will throw AIOOBE on
* malformed packets, that should be caught also.
*
*/ */
class UDPPacketReader { class UDPPacketReader {
private final I2PAppContext _context; private final I2PAppContext _context;
@ -314,7 +318,7 @@ class UDPPacketReader {
return DataHelper.fromLong(_message, off + (4 * index), 4); return DataHelper.fromLong(_message, off + (4 * index), 4);
} }
public ACKBitfield[] readACKBitfields() { public ACKBitfield[] readACKBitfields() throws DataFormatException {
if (!readACKBitfieldsIncluded()) return null; if (!readACKBitfieldsIncluded()) return null;
int off = readBodyOffset() + 1; int off = readBodyOffset() + 1;
if (readACKsIncluded()) { if (readACKsIncluded()) {
@ -334,7 +338,7 @@ class UDPPacketReader {
return rv; return rv;
} }
public int readFragmentCount() { public int readFragmentCount() throws DataFormatException {
int off = readBodyOffset() + 1; int off = readBodyOffset() + 1;
if (readACKsIncluded()) { if (readACKsIncluded()) {
int numACKs = (int)DataHelper.fromLong(_message, off, 1); int numACKs = (int)DataHelper.fromLong(_message, off, 1);
@ -358,31 +362,31 @@ class UDPPacketReader {
return _message[off]; return _message[off];
} }
public long readMessageId(int fragmentNum) { public long readMessageId(int fragmentNum) throws DataFormatException {
int fragmentBegin = getFragmentBegin(fragmentNum); int fragmentBegin = getFragmentBegin(fragmentNum);
return DataHelper.fromLong(_message, fragmentBegin, 4); return DataHelper.fromLong(_message, fragmentBegin, 4);
} }
public int readMessageFragmentNum(int fragmentNum) { public int readMessageFragmentNum(int fragmentNum) throws DataFormatException {
int off = getFragmentBegin(fragmentNum); int off = getFragmentBegin(fragmentNum);
off += 4; // messageId off += 4; // messageId
return (_message[off] & 0xFF) >>> 1; return (_message[off] & 0xFF) >>> 1;
} }
public boolean readMessageIsLast(int fragmentNum) { public boolean readMessageIsLast(int fragmentNum) throws DataFormatException {
int off = getFragmentBegin(fragmentNum); int off = getFragmentBegin(fragmentNum);
off += 4; // messageId off += 4; // messageId
return ((_message[off] & 1) != 0); return ((_message[off] & 1) != 0);
} }
public int readMessageFragmentSize(int fragmentNum) { public int readMessageFragmentSize(int fragmentNum) throws DataFormatException {
int off = getFragmentBegin(fragmentNum); int off = getFragmentBegin(fragmentNum);
off += 5; // messageId + fragment info off += 5; // messageId + fragment info
return ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF; return ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
} }
public void readMessageFragment(int fragmentNum, byte target[], int targetOffset) public void readMessageFragment(int fragmentNum, byte target[], int targetOffset)
throws ArrayIndexOutOfBoundsException { throws DataFormatException {
int off = getFragmentBegin(fragmentNum); int off = getFragmentBegin(fragmentNum);
off += 5; // messageId + fragment info off += 5; // messageId + fragment info
int size = ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF; int size = ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
@ -390,7 +394,7 @@ class UDPPacketReader {
System.arraycopy(_message, off, target, targetOffset, size); System.arraycopy(_message, off, target, targetOffset, size);
} }
private int getFragmentBegin(int fragmentNum) { private int getFragmentBegin(int fragmentNum) throws DataFormatException {
int off = readBodyOffset() + 1; int off = readBodyOffset() + 1;
if (readACKsIncluded()) { if (readACKsIncluded()) {
int numACKs = (int)DataHelper.fromLong(_message, off, 1); int numACKs = (int)DataHelper.fromLong(_message, off, 1);
@ -452,10 +456,15 @@ class UDPPacketReader {
off++; off++;
buf.append("with partial ACKs for "); buf.append("with partial ACKs for ");
for (int i = 0; i < numBitfields; i++) { try {
PacketACKBitfield bf = new PacketACKBitfield(off); for (int i = 0; i < numBitfields; i++) {
buf.append(bf.getMessageId()).append(' '); PacketACKBitfield bf = new PacketACKBitfield(off);
off += bf.getByteLength(); buf.append(bf.getMessageId()).append(' ');
off += bf.getByteLength();
}
} catch (DataFormatException dfe) {
buf.append("CORRUPT");
return buf.toString();
} }
} }
if (readExtendedDataIncluded()) { if (readExtendedDataIncluded()) {
@ -492,7 +501,7 @@ class UDPPacketReader {
return buf.toString(); return buf.toString();
} }
public void toRawString(StringBuilder buf) { public void toRawString(StringBuilder buf) throws DataFormatException {
UDPPacketReader.this.toRawString(buf); UDPPacketReader.this.toRawString(buf);
buf.append(" payload: "); buf.append(" payload: ");
@ -513,16 +522,19 @@ class UDPPacketReader {
private final int _bitfieldStart; private final int _bitfieldStart;
private final int _bitfieldSize; private final int _bitfieldSize;
public PacketACKBitfield(int start) { public PacketACKBitfield(int start) throws DataFormatException {
_start = start; _start = start;
_bitfieldStart = start + 4; _bitfieldStart = start + 4;
int bfsz = 1; int bfsz = 1;
// bitfield is an array of bytes where the high bit is 1 if // bitfield is an array of bytes where the high bit is 1 if
// further bytes in the bitfield follow // further bytes in the bitfield follow
while ((_message[_bitfieldStart + bfsz - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0) { while ((_message[_bitfieldStart + bfsz - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0) {
if (++bfsz > InboundMessageState.MAX_PARTIAL_BITFIELD_BYTES) bfsz++;
throw new IllegalArgumentException(); //if (bfsz > InboundMessageState.MAX_PARTIAL_BITFIELD_BYTES)
// throw new DataFormatException();
} }
if (bfsz > InboundMessageState.MAX_PARTIAL_BITFIELD_BYTES)
throw new DataFormatException("bitfield size: " + bfsz);
_bitfieldSize = bfsz; _bitfieldSize = bfsz;
} }