* if we send a blank ACK message (that will not in turn be ACKed) and it
has session tags within it, send an additional ping to the peer, bundling those tags a second time, ACKing those tags on the pong. * handle packets transferred during a race after the receiver ACKs the connection but before the establisher receives the ACK. * notify the messageInputStream reader on close (duh) * new stream sink test, shoving lots and lots of data down a stream with the existing StreamSinkServer and StreamSinkClient apps * logging
This commit is contained in:
@ -139,16 +139,30 @@ public class Connection {
|
|||||||
packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED);
|
packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean ackOnly = false;
|
||||||
|
|
||||||
if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
|
if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
|
||||||
// ACK only, no retries
|
ackOnly = true;
|
||||||
} else {
|
} else {
|
||||||
synchronized (_outboundPackets) {
|
synchronized (_outboundPackets) {
|
||||||
_outboundPackets.put(new Long(packet.getSequenceNum()), packet);
|
_outboundPackets.put(new Long(packet.getSequenceNum()), packet);
|
||||||
}
|
}
|
||||||
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getResendDelay());
|
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getResendDelay());
|
||||||
}
|
}
|
||||||
|
|
||||||
_lastSendTime = _context.clock().now();
|
_lastSendTime = _context.clock().now();
|
||||||
_outboundQueue.enqueue(packet);
|
_outboundQueue.enqueue(packet);
|
||||||
|
|
||||||
|
if (ackOnly) {
|
||||||
|
// ACK only, don't schedule this packet for retries
|
||||||
|
// however, if we are running low on sessionTags we want to send
|
||||||
|
// something that will get a reply so that we can deliver some new tags -
|
||||||
|
// ACKs don't get ACKed, but pings do.
|
||||||
|
if (packet.getTagsSent().size() > 0) {
|
||||||
|
_log.error("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags");
|
||||||
|
_connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List ackPackets(long ackThrough, long nacks[]) {
|
List ackPackets(long ackThrough, long nacks[]) {
|
||||||
@ -200,6 +214,9 @@ public class Connection {
|
|||||||
public boolean getIsConnected() { return _connected; }
|
public boolean getIsConnected() { return _connected; }
|
||||||
|
|
||||||
void disconnect(boolean cleanDisconnect) {
|
void disconnect(boolean cleanDisconnect) {
|
||||||
|
disconnect(cleanDisconnect, true);
|
||||||
|
}
|
||||||
|
void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) {
|
||||||
if (!_connected) return;
|
if (!_connected) return;
|
||||||
_connected = false;
|
_connected = false;
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
@ -219,6 +236,7 @@ public class Connection {
|
|||||||
synchronized (_outboundPackets) {
|
synchronized (_outboundPackets) {
|
||||||
_outboundPackets.clear();
|
_outboundPackets.clear();
|
||||||
}
|
}
|
||||||
|
if (removeFromConMgr)
|
||||||
_connectionManager.removeConnection(this);
|
_connectionManager.removeConnection(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -347,11 +365,12 @@ public class Connection {
|
|||||||
_packet.setReceiveStreamId(_receiveStreamId);
|
_packet.setReceiveStreamId(_receiveStreamId);
|
||||||
_packet.setSendStreamId(_sendStreamId);
|
_packet.setSendStreamId(_sendStreamId);
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
int numSends = _packet.getNumSends() + 1;
|
||||||
_log.debug("Resend packet " + _packet + " on " + Connection.this);
|
|
||||||
|
if (_log.shouldLog(Log.ERROR))
|
||||||
|
_log.error("Resend packet " + _packet + " time " + numSends + " on " + Connection.this);
|
||||||
_outboundQueue.enqueue(_packet);
|
_outboundQueue.enqueue(_packet);
|
||||||
|
|
||||||
int numSends = _packet.getNumSends();
|
|
||||||
if (numSends > _options.getMaxResends()) {
|
if (numSends > _options.getMaxResends()) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Too many resends");
|
_log.debug("Too many resends");
|
||||||
|
@ -39,8 +39,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
|
|||||||
if (_connection.getUnackedPacketsReceived() > 0)
|
if (_connection.getUnackedPacketsReceived() > 0)
|
||||||
doSend = true;
|
doSend = true;
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("writeData called: size="+size + " doSend=" + doSend + " con: " + _connection, new Exception("write called by"));
|
// _log.debug("writeData called: size="+size + " doSend=" + doSend + " con: " + _connection, new Exception("write called by"));
|
||||||
|
|
||||||
if (doSend) {
|
if (doSend) {
|
||||||
PacketLocal packet = buildPacket(buf, off, size);
|
PacketLocal packet = buildPacket(buf, off, size);
|
||||||
@ -51,11 +51,12 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean isAckOnly(int size) {
|
private boolean isAckOnly(int size) {
|
||||||
return ( (size <= 0) && // no data
|
boolean ackOnly = ( (size <= 0) && // no data
|
||||||
(_connection.getLastSendId() >= 0) && // not a SYN
|
(_connection.getLastSendId() >= 0) && // not a SYN
|
||||||
( (!_connection.getOutputStream().getClosed()) || // not a CLOSE
|
( (!_connection.getOutputStream().getClosed()) || // not a CLOSE
|
||||||
(_connection.getOutputStream().getClosed() &&
|
(_connection.getOutputStream().getClosed() &&
|
||||||
_connection.getCloseSentOn() > 0) )); // or it is a dup CLOSE
|
_connection.getCloseSentOn() > 0) )); // or it is a dup CLOSE
|
||||||
|
return ackOnly;
|
||||||
}
|
}
|
||||||
|
|
||||||
private PacketLocal buildPacket(byte buf[], int off, int size) {
|
private PacketLocal buildPacket(byte buf[], int off, int size) {
|
||||||
|
@ -10,6 +10,9 @@ import net.i2p.I2PAppContext;
|
|||||||
import net.i2p.client.I2PSession;
|
import net.i2p.client.I2PSession;
|
||||||
import net.i2p.data.ByteArray;
|
import net.i2p.data.ByteArray;
|
||||||
import net.i2p.data.Destination;
|
import net.i2p.data.Destination;
|
||||||
|
import net.i2p.data.SessionKey;
|
||||||
|
import net.i2p.util.SimpleTimer;
|
||||||
|
import net.i2p.util.Log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Coordinate all of the connections for a single local destination.
|
* Coordinate all of the connections for a single local destination.
|
||||||
@ -18,6 +21,7 @@ import net.i2p.data.Destination;
|
|||||||
*/
|
*/
|
||||||
public class ConnectionManager {
|
public class ConnectionManager {
|
||||||
private I2PAppContext _context;
|
private I2PAppContext _context;
|
||||||
|
private Log _log;
|
||||||
private I2PSession _session;
|
private I2PSession _session;
|
||||||
private MessageHandler _messageHandler;
|
private MessageHandler _messageHandler;
|
||||||
private PacketHandler _packetHandler;
|
private PacketHandler _packetHandler;
|
||||||
@ -34,6 +38,7 @@ public class ConnectionManager {
|
|||||||
|
|
||||||
public ConnectionManager(I2PAppContext context, I2PSession session) {
|
public ConnectionManager(I2PAppContext context, I2PSession session) {
|
||||||
_context = context;
|
_context = context;
|
||||||
|
_log = context.logManager().getLog(ConnectionManager.class);
|
||||||
_connectionByInboundId = new HashMap(32);
|
_connectionByInboundId = new HashMap(32);
|
||||||
_pendingPings = new HashMap(4);
|
_pendingPings = new HashMap(4);
|
||||||
_connectionLock = new Object();
|
_connectionLock = new Object();
|
||||||
@ -125,7 +130,7 @@ public class ConnectionManager {
|
|||||||
synchronized (_connectionLock) {
|
synchronized (_connectionLock) {
|
||||||
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
|
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
|
||||||
Connection con = (Connection)iter.next();
|
Connection con = (Connection)iter.next();
|
||||||
con.disconnect(false);
|
con.disconnect(false, false);
|
||||||
}
|
}
|
||||||
_connectionByInboundId.clear();
|
_connectionByInboundId.clear();
|
||||||
}
|
}
|
||||||
@ -144,22 +149,34 @@ public class ConnectionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public boolean ping(Destination peer, long timeoutMs) {
|
public boolean ping(Destination peer, long timeoutMs) {
|
||||||
PingRequest req = new PingRequest();
|
return ping(peer, timeoutMs, true);
|
||||||
|
}
|
||||||
|
public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
|
||||||
|
return ping(peer, timeoutMs, blocking, null, null);
|
||||||
|
}
|
||||||
|
public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend) {
|
||||||
byte id[] = new byte[4];
|
byte id[] = new byte[4];
|
||||||
_context.random().nextBytes(id);
|
_context.random().nextBytes(id);
|
||||||
ByteArray ba = new ByteArray(id);
|
ByteArray ba = new ByteArray(id);
|
||||||
|
|
||||||
synchronized (_pendingPings) {
|
|
||||||
_pendingPings.put(ba, req);
|
|
||||||
}
|
|
||||||
|
|
||||||
PacketLocal packet = new PacketLocal(_context, peer);
|
PacketLocal packet = new PacketLocal(_context, peer);
|
||||||
packet.setSendStreamId(id);
|
packet.setSendStreamId(id);
|
||||||
packet.setFlag(Packet.FLAG_ECHO);
|
packet.setFlag(Packet.FLAG_ECHO);
|
||||||
packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
|
packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
|
||||||
packet.setOptionalFrom(_session.getMyDestination());
|
packet.setOptionalFrom(_session.getMyDestination());
|
||||||
|
if ( (keyToUse != null) && (tagsToSend != null) ) {
|
||||||
|
packet.setKeyUsed(keyToUse);
|
||||||
|
packet.setTagsSent(tagsToSend);
|
||||||
|
}
|
||||||
|
|
||||||
|
PingRequest req = new PingRequest(peer, packet);
|
||||||
|
|
||||||
|
synchronized (_pendingPings) {
|
||||||
|
_pendingPings.put(ba, req);
|
||||||
|
}
|
||||||
|
|
||||||
_outboundQueue.enqueue(packet);
|
_outboundQueue.enqueue(packet);
|
||||||
|
|
||||||
|
if (blocking) {
|
||||||
synchronized (req) {
|
synchronized (req) {
|
||||||
if (!req.pongReceived())
|
if (!req.pongReceived())
|
||||||
try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
|
try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
|
||||||
@ -168,18 +185,41 @@ public class ConnectionManager {
|
|||||||
synchronized (_pendingPings) {
|
synchronized (_pendingPings) {
|
||||||
_pendingPings.remove(ba);
|
_pendingPings.remove(ba);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
SimpleTimer.getInstance().addEvent(new PingFailed(ba), timeoutMs);
|
||||||
|
}
|
||||||
|
|
||||||
boolean ok = req.pongReceived();
|
boolean ok = req.pongReceived();
|
||||||
if (ok) {
|
|
||||||
_context.sessionKeyManager().tagsDelivered(peer.getPublicKey(), packet.getKeyUsed(), packet.getTagsSent());
|
|
||||||
}
|
|
||||||
return ok;
|
return ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class PingFailed implements SimpleTimer.TimedEvent {
|
||||||
|
private ByteArray _ba;
|
||||||
|
public PingFailed(ByteArray ba) { _ba = ba; }
|
||||||
|
public void timeReached() {
|
||||||
|
boolean removed = false;
|
||||||
|
synchronized (_pendingPings) {
|
||||||
|
Object o = _pendingPings.remove(_ba);
|
||||||
|
if (o != null)
|
||||||
|
removed = true;
|
||||||
|
}
|
||||||
|
if (removed)
|
||||||
|
_log.error("Ping failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private class PingRequest {
|
private class PingRequest {
|
||||||
private boolean _ponged;
|
private boolean _ponged;
|
||||||
public PingRequest() { _ponged = false; }
|
private Destination _peer;
|
||||||
|
private PacketLocal _packet;
|
||||||
|
public PingRequest(Destination peer, PacketLocal packet) {
|
||||||
|
_ponged = false;
|
||||||
|
_peer = peer;
|
||||||
|
_packet = packet;
|
||||||
|
}
|
||||||
public void pong() {
|
public void pong() {
|
||||||
|
_log.error("Ping successful");
|
||||||
|
_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
|
||||||
synchronized (ConnectionManager.PingRequest.this) {
|
synchronized (ConnectionManager.PingRequest.this) {
|
||||||
_ponged = true;
|
_ponged = true;
|
||||||
ConnectionManager.PingRequest.this.notifyAll();
|
ConnectionManager.PingRequest.this.notifyAll();
|
||||||
|
@ -55,7 +55,7 @@ public class ConnectionOptions extends I2PSocketOptions {
|
|||||||
setRTT(5*1000);
|
setRTT(5*1000);
|
||||||
setReceiveWindow(1);
|
setReceiveWindow(1);
|
||||||
setResendDelay(5*1000);
|
setResendDelay(5*1000);
|
||||||
setSendAckDelay(1*1000);
|
setSendAckDelay(2*1000);
|
||||||
setWindowSize(1);
|
setWindowSize(1);
|
||||||
setMaxResends(10);
|
setMaxResends(10);
|
||||||
}
|
}
|
||||||
|
@ -169,9 +169,10 @@ public class ConnectionPacketHandler {
|
|||||||
_log.warn("Received unsigned / forged packet: " + packet);
|
_log.warn("Received unsigned / forged packet: " + packet);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (packet.isFlagSet(Packet.FLAG_CLOSE))
|
if (packet.isFlagSet(Packet.FLAG_CLOSE)) {
|
||||||
con.closeReceived();
|
con.closeReceived();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,19 @@ public class I2PSocketManagerFull implements I2PSocketManager {
|
|||||||
*/
|
*/
|
||||||
private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
|
private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
|
||||||
|
|
||||||
|
public I2PSocketManagerFull() {
|
||||||
|
_context = null;
|
||||||
|
_session = null;
|
||||||
|
}
|
||||||
public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
|
public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
|
||||||
|
this();
|
||||||
|
init(context, session, opts, name);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
|
||||||
_context = context;
|
_context = context;
|
||||||
_session = session;
|
_session = session;
|
||||||
_log = _context.logManager().getLog(I2PSocketManagerFull.class);
|
_log = _context.logManager().getLog(I2PSocketManagerFull.class);
|
||||||
|
@ -149,7 +149,11 @@ public class MessageInputStream extends InputStream {
|
|||||||
|
|
||||||
public void closeReceived() {
|
public void closeReceived() {
|
||||||
synchronized (_dataLock) {
|
synchronized (_dataLock) {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Close received, ready size: " + _readyDataBlocks.size()
|
||||||
|
+ " not ready: " + _notYetReadyBlocks.size(), new Exception("closed"));
|
||||||
_closeReceived = true;
|
_closeReceived = true;
|
||||||
|
_dataLock.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,8 +37,8 @@ public class MessageOutputStream extends OutputStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void write(byte b[], int off, int len) throws IOException {
|
public void write(byte b[], int off, int len) throws IOException {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("write(b[], " + off + ", " + len + ")");
|
// _log.debug("write(b[], " + off + ", " + len + ")");
|
||||||
synchronized (_dataLock) {
|
synchronized (_dataLock) {
|
||||||
int cur = off;
|
int cur = off;
|
||||||
int remaining = len;
|
int remaining = len;
|
||||||
@ -49,8 +49,8 @@ public class MessageOutputStream extends OutputStream {
|
|||||||
_valid += remaining;
|
_valid += remaining;
|
||||||
cur += remaining;
|
cur += remaining;
|
||||||
remaining = 0;
|
remaining = 0;
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("write(...): appending valid = " + _valid + " remaining=" + remaining);
|
// _log.debug("write(...): appending valid = " + _valid + " remaining=" + remaining);
|
||||||
} else {
|
} else {
|
||||||
// buffer whatever we can fit then flush,
|
// buffer whatever we can fit then flush,
|
||||||
// repeating until we've pushed all of the
|
// repeating until we've pushed all of the
|
||||||
@ -60,14 +60,14 @@ public class MessageOutputStream extends OutputStream {
|
|||||||
remaining -= toWrite;
|
remaining -= toWrite;
|
||||||
cur += toWrite;
|
cur += toWrite;
|
||||||
_valid = _buf.length;
|
_valid = _buf.length;
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("write(...): flushing valid = " + _valid + " remaining=" + remaining);
|
// _log.debug("write(...): flushing valid = " + _valid + " remaining=" + remaining);
|
||||||
// this blocks until the packet is ack window is open. it
|
// this blocks until the packet is ack window is open. it
|
||||||
// also throws InterruptedIOException if the write timeout
|
// also throws InterruptedIOException if the write timeout
|
||||||
// expires
|
// expires
|
||||||
_dataReceiver.writeData(_buf, 0, _valid);
|
_dataReceiver.writeData(_buf, 0, _valid);
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("write(...): flushing complete valid = " + _valid + " remaining=" + remaining);
|
// _log.debug("write(...): flushing complete valid = " + _valid + " remaining=" + remaining);
|
||||||
_valid = 0;
|
_valid = 0;
|
||||||
throwAnyError();
|
throwAnyError();
|
||||||
}
|
}
|
||||||
@ -83,14 +83,14 @@ public class MessageOutputStream extends OutputStream {
|
|||||||
|
|
||||||
public void flush() throws IOException {
|
public void flush() throws IOException {
|
||||||
synchronized (_dataLock) {
|
synchronized (_dataLock) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("flush(): valid = " + _valid);
|
// _log.debug("flush(): valid = " + _valid);
|
||||||
// this blocks until the packet is ack window is open. it
|
// this blocks until the packet is ack window is open. it
|
||||||
// also throws InterruptedIOException if the write timeout
|
// also throws InterruptedIOException if the write timeout
|
||||||
// expires
|
// expires
|
||||||
_dataReceiver.writeData(_buf, 0, _valid);
|
_dataReceiver.writeData(_buf, 0, _valid);
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("flush(): valid = " + _valid + " complete");
|
// _log.debug("flush(): valid = " + _valid + " complete");
|
||||||
_valid = 0;
|
_valid = 0;
|
||||||
}
|
}
|
||||||
throwAnyError();
|
throwAnyError();
|
||||||
|
@ -493,10 +493,21 @@ public class Packet {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Packet " + _sequenceNum + " on " + toId(_sendStreamId)
|
StringBuffer buf = new StringBuffer(64);
|
||||||
+ "<-->" + toId(_receiveStreamId) + ": " + toFlagString()
|
buf.append(toId(_sendStreamId));
|
||||||
+ " ACK through " + _ackThrough
|
//buf.append("<-->");
|
||||||
+ " size: " + (_payload != null ? _payload.length : 0);
|
buf.append(toId(_receiveStreamId)).append(": #").append(_sequenceNum);
|
||||||
|
buf.append(" ").append(toFlagString());
|
||||||
|
buf.append(" ACK ").append(_ackThrough);
|
||||||
|
if (_nacks != null) {
|
||||||
|
buf.append(" NACK");
|
||||||
|
for (int i = 0; i < _nacks.length; i++) {
|
||||||
|
buf.append(" ").append(_nacks[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ( (_payload != null) && (_payload.length > 0) )
|
||||||
|
buf.append(" data: ").append(_payload.length);
|
||||||
|
return buf.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final String toId(byte id[]) {
|
private static final String toId(byte id[]) {
|
||||||
@ -512,7 +523,7 @@ public class Packet {
|
|||||||
if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY");
|
if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY");
|
||||||
if (isFlagSet(FLAG_ECHO)) buf.append(" ECHO");
|
if (isFlagSet(FLAG_ECHO)) buf.append(" ECHO");
|
||||||
if (isFlagSet(FLAG_FROM_INCLUDED)) buf.append(" FROM");
|
if (isFlagSet(FLAG_FROM_INCLUDED)) buf.append(" FROM");
|
||||||
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MAXSIZE");
|
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS");
|
||||||
if (isFlagSet(FLAG_PROFILE_INTERACTIVE)) buf.append(" INTERACTIVE");
|
if (isFlagSet(FLAG_PROFILE_INTERACTIVE)) buf.append(" INTERACTIVE");
|
||||||
if (isFlagSet(FLAG_RESET)) buf.append(" RESET");
|
if (isFlagSet(FLAG_RESET)) buf.append(" RESET");
|
||||||
if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG");
|
if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG");
|
||||||
|
@ -3,6 +3,7 @@ package net.i2p.client.streaming;
|
|||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
|
||||||
import net.i2p.I2PAppContext;
|
import net.i2p.I2PAppContext;
|
||||||
import net.i2p.data.Base64;
|
import net.i2p.data.Base64;
|
||||||
@ -36,10 +37,20 @@ public class PacketHandler {
|
|||||||
Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null);
|
Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null);
|
||||||
if (con != null) {
|
if (con != null) {
|
||||||
receiveKnownCon(con, packet);
|
receiveKnownCon(con, packet);
|
||||||
System.out.println(new Date() + ": Receive packet " + packet + " on con " + con);
|
displayPacket(packet, con);
|
||||||
} else {
|
} else {
|
||||||
receiveUnknownCon(packet, sendId);
|
receiveUnknownCon(packet, sendId);
|
||||||
System.out.println(new Date() + ": Receive packet " + packet + " on an unknown con");
|
displayPacket(packet, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void displayPacket(Packet packet, Connection con) {
|
||||||
|
if (_log.shouldLog(Log.DEBUG)) {
|
||||||
|
//SimpleDateFormat fmt = new SimpleDateFormat("hh:mm:ss.SSS");
|
||||||
|
//String now = fmt.format(new Date());
|
||||||
|
String msg = packet + (con != null ? " on " + con : " on unknown con");
|
||||||
|
_log.debug(msg);
|
||||||
|
// System.out.println(now + ": " + msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,15 +106,21 @@ public class PacketHandler {
|
|||||||
} else {
|
} else {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Packet received on an unknown stream (and not a SYN): " + packet);
|
_log.debug("Packet received on an unknown stream (and not a SYN): " + packet);
|
||||||
if (packet.getSendStreamId() == null) {
|
if (sendId == null) {
|
||||||
for (Iterator iter = _manager.listConnections().iterator(); iter.hasNext(); ) {
|
for (Iterator iter = _manager.listConnections().iterator(); iter.hasNext(); ) {
|
||||||
Connection con = (Connection)iter.next();
|
Connection con = (Connection)iter.next();
|
||||||
if (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId()) &&
|
if (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) {
|
||||||
con.getAckedPackets() <= 0) {
|
if (con.getAckedPackets() <= 0) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Received additional packets before the syn on " + con + ": " + packet);
|
_log.debug("Received additional packets before the syn on " + con + ": " + packet);
|
||||||
receiveKnownCon(con, packet);
|
receiveKnownCon(con, packet);
|
||||||
return;
|
return;
|
||||||
|
} else {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("hrmph, received while ack of syn was in flight on " + con + ": " + packet + " acked: " + con.getAckedPackets());
|
||||||
|
receiveKnownCon(con, packet);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -114,7 +131,9 @@ public class PacketHandler {
|
|||||||
Connection con = (Connection)iter.next();
|
Connection con = (Connection)iter.next();
|
||||||
buf.append(Base64.encode(con.getReceiveStreamId())).append(" ");
|
buf.append(Base64.encode(con.getReceiveStreamId())).append(" ");
|
||||||
}
|
}
|
||||||
_log.warn("Packet belongs to know other cons: " + packet + " connections: " + buf.toString());
|
_log.warn("Packet belongs to no other cons: " + packet + " connections: "
|
||||||
|
+ buf.toString() + " sendId: "
|
||||||
|
+ (sendId != null ? Base64.encode(sendId) : " unknown"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,15 @@ public class PacketLocal extends Packet {
|
|||||||
public void setKeyUsed(SessionKey key) { _keyUsed = key; }
|
public void setKeyUsed(SessionKey key) { _keyUsed = key; }
|
||||||
|
|
||||||
public Set getTagsSent() { return _tagsSent; }
|
public Set getTagsSent() { return _tagsSent; }
|
||||||
public void setTagsSent(Set tags) { _tagsSent = tags; }
|
public void setTagsSent(Set tags) {
|
||||||
|
if ( (_tagsSent != null) && (_tagsSent.size() > 0) && (tags.size() > 0) ) {
|
||||||
|
int old = _tagsSent.size();
|
||||||
|
_tagsSent.addAll(tags);
|
||||||
|
//System.out.println("Dup tags set on " +toString() + " old=" + old + " new=" + tags.size());
|
||||||
|
} else {
|
||||||
|
_tagsSent = tags;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public boolean shouldSign() {
|
public boolean shouldSign() {
|
||||||
return isFlagSet(FLAG_SIGNATURE_INCLUDED) ||
|
return isFlagSet(FLAG_SIGNATURE_INCLUDED) ||
|
||||||
|
@ -35,8 +35,12 @@ class PacketQueue {
|
|||||||
else
|
else
|
||||||
size = packet.writePacket(_buf, 0);
|
size = packet.writePacket(_buf, 0);
|
||||||
|
|
||||||
SessionKey keyUsed = new SessionKey();
|
SessionKey keyUsed = packet.getKeyUsed();
|
||||||
Set tagsSent = new HashSet();
|
if (keyUsed == null)
|
||||||
|
keyUsed = new SessionKey();
|
||||||
|
Set tagsSent = packet.getTagsSent();
|
||||||
|
if (tagsSent == null)
|
||||||
|
tagsSent = new HashSet();
|
||||||
try {
|
try {
|
||||||
// this should not block!
|
// this should not block!
|
||||||
boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent);
|
boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent);
|
||||||
@ -47,6 +51,13 @@ class PacketQueue {
|
|||||||
packet.setKeyUsed(keyUsed);
|
packet.setKeyUsed(keyUsed);
|
||||||
packet.setTagsSent(tagsSent);
|
packet.setTagsSent(tagsSent);
|
||||||
packet.incrementSends();
|
packet.incrementSends();
|
||||||
|
if (_log.shouldLog(Log.DEBUG)) {
|
||||||
|
String msg = packet + " sent" + (tagsSent.size() > 0
|
||||||
|
? " with " + tagsSent.size() + " tags"
|
||||||
|
: "")
|
||||||
|
+ " send # " + packet.getNumSends();
|
||||||
|
_log.debug(msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (I2PSessionException ise) {
|
} catch (I2PSessionException ise) {
|
||||||
if (_log.shouldLog(Log.WARN))
|
if (_log.shouldLog(Log.WARN))
|
||||||
|
@ -28,8 +28,8 @@ class SchedulerChooser {
|
|||||||
for (int i = 0; i < _schedulers.size(); i++) {
|
for (int i = 0; i < _schedulers.size(); i++) {
|
||||||
TaskScheduler scheduler = (TaskScheduler)_schedulers.get(i);
|
TaskScheduler scheduler = (TaskScheduler)_schedulers.get(i);
|
||||||
if (scheduler.accept(con)) {
|
if (scheduler.accept(con)) {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Scheduling for " + con + " with " + scheduler.getClass().getName());
|
// _log.debug("Scheduling for " + con + " with " + scheduler.getClass().getName());
|
||||||
return scheduler;
|
return scheduler;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,8 +28,8 @@ abstract class SchedulerImpl implements TaskScheduler {
|
|||||||
_addedBy = new Exception("added by");
|
_addedBy = new Exception("added by");
|
||||||
}
|
}
|
||||||
public void timeReached() {
|
public void timeReached() {
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
//if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("firing event on " + _connection, _addedBy);
|
// _log.debug("firing event on " + _connection, _addedBy);
|
||||||
_connection.eventOccurred();
|
_connection.eventOccurred();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,56 @@
|
|||||||
|
package net.i2p.client.streaming;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class StreamSinkTest {
|
||||||
|
public static void main(String args[]) {
|
||||||
|
System.setProperty(I2PSocketManagerFactory.PROP_MANAGER, I2PSocketManagerFull.class.getName());
|
||||||
|
|
||||||
|
new Thread(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
StreamSinkServer.main(new String[] { "streamSinkTestDir", "streamSinkTestServer.key" });
|
||||||
|
}
|
||||||
|
}, "server").start();
|
||||||
|
|
||||||
|
try { Thread.sleep(30*1000); } catch (Exception e) {}
|
||||||
|
|
||||||
|
//run(256, 10000);
|
||||||
|
//run(256, 1000);
|
||||||
|
//run(1024, 10);
|
||||||
|
run(32*1024, 1);
|
||||||
|
//run("/home/jrandom/streamSinkTestDir/clientSink36766.dat", 1);
|
||||||
|
//run(512*1024, 1);
|
||||||
|
try { Thread.sleep(10*1000); } catch (InterruptedException e) {}
|
||||||
|
System.out.println("Shutting down");
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void run(final int kb, final int msBetweenWrites) {
|
||||||
|
Thread t = new Thread(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
StreamSinkClient.main(new String[] { kb+"", msBetweenWrites+"", "streamSinkTestServer.key" });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
t.start();
|
||||||
|
|
||||||
|
System.out.println("client and server started: size = " + kb + "KB, delay = " + msBetweenWrites);
|
||||||
|
try {
|
||||||
|
t.join();
|
||||||
|
} catch (InterruptedException ie) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void run(final String filename, final int msBetweenWrites) {
|
||||||
|
Thread t = new Thread(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
StreamSinkSend.main(new String[] { filename, msBetweenWrites+"", "streamSinkTestServer.key" });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
t.start();
|
||||||
|
|
||||||
|
System.out.println("client and server started: file " + filename + ", delay = " + msBetweenWrites);
|
||||||
|
try {
|
||||||
|
t.join();
|
||||||
|
} catch (InterruptedException ie) {}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user