diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 6a4b02b46..6513f3331 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -139,16 +139,30 @@ public class Connection { packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED); } + boolean ackOnly = false; + if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { - // ACK only, no retries + ackOnly = true; } else { synchronized (_outboundPackets) { _outboundPackets.put(new Long(packet.getSequenceNum()), packet); } SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getResendDelay()); } + _lastSendTime = _context.clock().now(); - _outboundQueue.enqueue(packet); + _outboundQueue.enqueue(packet); + + if (ackOnly) { + // ACK only, don't schedule this packet for retries + // however, if we are running low on sessionTags we want to send + // something that will get a reply so that we can deliver some new tags - + // ACKs don't get ACKed, but pings do. + if (packet.getTagsSent().size() > 0) { + _log.error("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags"); + _connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent()); + } + } } List ackPackets(long ackThrough, long nacks[]) { @@ -200,6 +214,9 @@ public class Connection { public boolean getIsConnected() { return _connected; } void disconnect(boolean cleanDisconnect) { + disconnect(cleanDisconnect, true); + } + void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) { if (!_connected) return; _connected = false; if (_log.shouldLog(Log.DEBUG)) @@ -219,7 +236,8 @@ public class Connection { synchronized (_outboundPackets) { _outboundPackets.clear(); } - _connectionManager.removeConnection(this); + if (removeFromConMgr) + _connectionManager.removeConnection(this); } } @@ -347,11 +365,12 @@ public class Connection { _packet.setReceiveStreamId(_receiveStreamId); _packet.setSendStreamId(_sendStreamId); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resend packet " + _packet + " on " + Connection.this); + int numSends = _packet.getNumSends() + 1; + + if (_log.shouldLog(Log.ERROR)) + _log.error("Resend packet " + _packet + " time " + numSends + " on " + Connection.this); _outboundQueue.enqueue(_packet); - int numSends = _packet.getNumSends(); if (numSends > _options.getMaxResends()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Too many resends"); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 1d854ca82..4f4b76cb2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -39,8 +39,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { if (_connection.getUnackedPacketsReceived() > 0) doSend = true; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("writeData called: size="+size + " doSend=" + doSend + " con: " + _connection, new Exception("write called by")); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("writeData called: size="+size + " doSend=" + doSend + " con: " + _connection, new Exception("write called by")); if (doSend) { PacketLocal packet = buildPacket(buf, off, size); @@ -51,11 +51,12 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { } private boolean isAckOnly(int size) { - return ( (size <= 0) && // no data - (_connection.getLastSendId() >= 0) && // not a SYN - ( (!_connection.getOutputStream().getClosed()) || // not a CLOSE - (_connection.getOutputStream().getClosed() && - _connection.getCloseSentOn() > 0) )); // or it is a dup CLOSE + boolean ackOnly = ( (size <= 0) && // no data + (_connection.getLastSendId() >= 0) && // not a SYN + ( (!_connection.getOutputStream().getClosed()) || // not a CLOSE + (_connection.getOutputStream().getClosed() && + _connection.getCloseSentOn() > 0) )); // or it is a dup CLOSE + return ackOnly; } private PacketLocal buildPacket(byte buf[], int off, int size) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 9634de5d9..e1a7a8852 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -10,6 +10,9 @@ import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; import net.i2p.data.ByteArray; import net.i2p.data.Destination; +import net.i2p.data.SessionKey; +import net.i2p.util.SimpleTimer; +import net.i2p.util.Log; /** * Coordinate all of the connections for a single local destination. @@ -18,6 +21,7 @@ import net.i2p.data.Destination; */ public class ConnectionManager { private I2PAppContext _context; + private Log _log; private I2PSession _session; private MessageHandler _messageHandler; private PacketHandler _packetHandler; @@ -34,6 +38,7 @@ public class ConnectionManager { public ConnectionManager(I2PAppContext context, I2PSession session) { _context = context; + _log = context.logManager().getLog(ConnectionManager.class); _connectionByInboundId = new HashMap(32); _pendingPings = new HashMap(4); _connectionLock = new Object(); @@ -125,7 +130,7 @@ public class ConnectionManager { synchronized (_connectionLock) { for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); - con.disconnect(false); + con.disconnect(false, false); } _connectionByInboundId.clear(); } @@ -144,42 +149,77 @@ public class ConnectionManager { } public boolean ping(Destination peer, long timeoutMs) { - PingRequest req = new PingRequest(); + return ping(peer, timeoutMs, true); + } + public boolean ping(Destination peer, long timeoutMs, boolean blocking) { + return ping(peer, timeoutMs, blocking, null, null); + } + public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend) { byte id[] = new byte[4]; _context.random().nextBytes(id); ByteArray ba = new ByteArray(id); - - synchronized (_pendingPings) { - _pendingPings.put(ba, req); - } - PacketLocal packet = new PacketLocal(_context, peer); packet.setSendStreamId(id); packet.setFlag(Packet.FLAG_ECHO); packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); packet.setOptionalFrom(_session.getMyDestination()); - _outboundQueue.enqueue(packet); - - synchronized (req) { - if (!req.pongReceived()) - try { req.wait(timeoutMs); } catch (InterruptedException ie) {} + if ( (keyToUse != null) && (tagsToSend != null) ) { + packet.setKeyUsed(keyToUse); + packet.setTagsSent(tagsToSend); } + PingRequest req = new PingRequest(peer, packet); + synchronized (_pendingPings) { - _pendingPings.remove(ba); + _pendingPings.put(ba, req); + } + + _outboundQueue.enqueue(packet); + + if (blocking) { + synchronized (req) { + if (!req.pongReceived()) + try { req.wait(timeoutMs); } catch (InterruptedException ie) {} + } + + synchronized (_pendingPings) { + _pendingPings.remove(ba); + } + } else { + SimpleTimer.getInstance().addEvent(new PingFailed(ba), timeoutMs); } boolean ok = req.pongReceived(); - if (ok) { - _context.sessionKeyManager().tagsDelivered(peer.getPublicKey(), packet.getKeyUsed(), packet.getTagsSent()); - } return ok; } + private class PingFailed implements SimpleTimer.TimedEvent { + private ByteArray _ba; + public PingFailed(ByteArray ba) { _ba = ba; } + public void timeReached() { + boolean removed = false; + synchronized (_pendingPings) { + Object o = _pendingPings.remove(_ba); + if (o != null) + removed = true; + } + if (removed) + _log.error("Ping failed"); + } + } + private class PingRequest { private boolean _ponged; - public PingRequest() { _ponged = false; } + private Destination _peer; + private PacketLocal _packet; + public PingRequest(Destination peer, PacketLocal packet) { + _ponged = false; + _peer = peer; + _packet = packet; + } public void pong() { + _log.error("Ping successful"); + _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent()); synchronized (ConnectionManager.PingRequest.this) { _ponged = true; ConnectionManager.PingRequest.this.notifyAll(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 89cbd5c51..190d280e8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -55,7 +55,7 @@ public class ConnectionOptions extends I2PSocketOptions { setRTT(5*1000); setReceiveWindow(1); setResendDelay(5*1000); - setSendAckDelay(1*1000); + setSendAckDelay(2*1000); setWindowSize(1); setMaxResends(10); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index a4f2323f7..fa9e08402 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -169,8 +169,9 @@ public class ConnectionPacketHandler { _log.warn("Received unsigned / forged packet: " + packet); return false; } - if (packet.isFlagSet(Packet.FLAG_CLOSE)) + if (packet.isFlagSet(Packet.FLAG_CLOSE)) { con.closeReceived(); + } } return true; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index 4b98544f9..3c3ba65f3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -45,7 +45,19 @@ public class I2PSocketManagerFull implements I2PSocketManager { */ private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000; + public I2PSocketManagerFull() { + _context = null; + _session = null; + } public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { + this(); + init(context, session, opts, name); + } + + /** + * + */ + public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { _context = context; _session = session; _log = _context.logManager().getLog(I2PSocketManagerFull.class); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 080ab6917..9a3880737 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -149,7 +149,11 @@ public class MessageInputStream extends InputStream { public void closeReceived() { synchronized (_dataLock) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Close received, ready size: " + _readyDataBlocks.size() + + " not ready: " + _notYetReadyBlocks.size(), new Exception("closed")); _closeReceived = true; + _dataLock.notifyAll(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 8229abd88..79d72a40f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -37,8 +37,8 @@ public class MessageOutputStream extends OutputStream { } public void write(byte b[], int off, int len) throws IOException { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("write(b[], " + off + ", " + len + ")"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("write(b[], " + off + ", " + len + ")"); synchronized (_dataLock) { int cur = off; int remaining = len; @@ -49,8 +49,8 @@ public class MessageOutputStream extends OutputStream { _valid += remaining; cur += remaining; remaining = 0; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("write(...): appending valid = " + _valid + " remaining=" + remaining); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("write(...): appending valid = " + _valid + " remaining=" + remaining); } else { // buffer whatever we can fit then flush, // repeating until we've pushed all of the @@ -60,14 +60,14 @@ public class MessageOutputStream extends OutputStream { remaining -= toWrite; cur += toWrite; _valid = _buf.length; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("write(...): flushing valid = " + _valid + " remaining=" + remaining); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("write(...): flushing valid = " + _valid + " remaining=" + remaining); // this blocks until the packet is ack window is open. it // also throws InterruptedIOException if the write timeout // expires _dataReceiver.writeData(_buf, 0, _valid); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("write(...): flushing complete valid = " + _valid + " remaining=" + remaining); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("write(...): flushing complete valid = " + _valid + " remaining=" + remaining); _valid = 0; throwAnyError(); } @@ -83,14 +83,14 @@ public class MessageOutputStream extends OutputStream { public void flush() throws IOException { synchronized (_dataLock) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("flush(): valid = " + _valid); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("flush(): valid = " + _valid); // this blocks until the packet is ack window is open. it // also throws InterruptedIOException if the write timeout // expires _dataReceiver.writeData(_buf, 0, _valid); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("flush(): valid = " + _valid + " complete"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("flush(): valid = " + _valid + " complete"); _valid = 0; } throwAnyError(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 335801196..292f75c32 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -493,10 +493,21 @@ public class Packet { } public String toString() { - return "Packet " + _sequenceNum + " on " + toId(_sendStreamId) - + "<-->" + toId(_receiveStreamId) + ": " + toFlagString() - + " ACK through " + _ackThrough - + " size: " + (_payload != null ? _payload.length : 0); + StringBuffer buf = new StringBuffer(64); + buf.append(toId(_sendStreamId)); + //buf.append("<-->"); + buf.append(toId(_receiveStreamId)).append(": #").append(_sequenceNum); + buf.append(" ").append(toFlagString()); + buf.append(" ACK ").append(_ackThrough); + if (_nacks != null) { + buf.append(" NACK"); + for (int i = 0; i < _nacks.length; i++) { + buf.append(" ").append(_nacks[i]); + } + } + if ( (_payload != null) && (_payload.length > 0) ) + buf.append(" data: ").append(_payload.length); + return buf.toString(); } private static final String toId(byte id[]) { @@ -512,7 +523,7 @@ public class Packet { if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY"); if (isFlagSet(FLAG_ECHO)) buf.append(" ECHO"); if (isFlagSet(FLAG_FROM_INCLUDED)) buf.append(" FROM"); - if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MAXSIZE"); + if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS"); if (isFlagSet(FLAG_PROFILE_INTERACTIVE)) buf.append(" INTERACTIVE"); if (isFlagSet(FLAG_RESET)) buf.append(" RESET"); if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG"); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index c500fe9c2..aae8fee22 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -3,6 +3,7 @@ package net.i2p.client.streaming; import java.util.Date; import java.util.Iterator; import java.util.Set; +import java.text.SimpleDateFormat; import net.i2p.I2PAppContext; import net.i2p.data.Base64; @@ -36,10 +37,20 @@ public class PacketHandler { Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null); if (con != null) { receiveKnownCon(con, packet); - System.out.println(new Date() + ": Receive packet " + packet + " on con " + con); + displayPacket(packet, con); } else { receiveUnknownCon(packet, sendId); - System.out.println(new Date() + ": Receive packet " + packet + " on an unknown con"); + displayPacket(packet, null); + } + } + + private void displayPacket(Packet packet, Connection con) { + if (_log.shouldLog(Log.DEBUG)) { + //SimpleDateFormat fmt = new SimpleDateFormat("hh:mm:ss.SSS"); + //String now = fmt.format(new Date()); + String msg = packet + (con != null ? " on " + con : " on unknown con"); + _log.debug(msg); + // System.out.println(now + ": " + msg); } } @@ -95,15 +106,21 @@ public class PacketHandler { } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet received on an unknown stream (and not a SYN): " + packet); - if (packet.getSendStreamId() == null) { + if (sendId == null) { for (Iterator iter = _manager.listConnections().iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); - if (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId()) && - con.getAckedPackets() <= 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received additional packets before the syn on " + con + ": " + packet); - receiveKnownCon(con, packet); - return; + if (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) { + if (con.getAckedPackets() <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received additional packets before the syn on " + con + ": " + packet); + receiveKnownCon(con, packet); + return; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("hrmph, received while ack of syn was in flight on " + con + ": " + packet + " acked: " + con.getAckedPackets()); + receiveKnownCon(con, packet); + return; + } } } } @@ -114,7 +131,9 @@ public class PacketHandler { Connection con = (Connection)iter.next(); buf.append(Base64.encode(con.getReceiveStreamId())).append(" "); } - _log.warn("Packet belongs to know other cons: " + packet + " connections: " + buf.toString()); + _log.warn("Packet belongs to no other cons: " + packet + " connections: " + + buf.toString() + " sendId: " + + (sendId != null ? Base64.encode(sendId) : " unknown")); } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index a2bb107d9..50fab40aa 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -34,7 +34,15 @@ public class PacketLocal extends Packet { public void setKeyUsed(SessionKey key) { _keyUsed = key; } public Set getTagsSent() { return _tagsSent; } - public void setTagsSent(Set tags) { _tagsSent = tags; } + public void setTagsSent(Set tags) { + if ( (_tagsSent != null) && (_tagsSent.size() > 0) && (tags.size() > 0) ) { + int old = _tagsSent.size(); + _tagsSent.addAll(tags); + //System.out.println("Dup tags set on " +toString() + " old=" + old + " new=" + tags.size()); + } else { + _tagsSent = tags; + } + } public boolean shouldSign() { return isFlagSet(FLAG_SIGNATURE_INCLUDED) || diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 0681fc374..37b49297a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -35,8 +35,12 @@ class PacketQueue { else size = packet.writePacket(_buf, 0); - SessionKey keyUsed = new SessionKey(); - Set tagsSent = new HashSet(); + SessionKey keyUsed = packet.getKeyUsed(); + if (keyUsed == null) + keyUsed = new SessionKey(); + Set tagsSent = packet.getTagsSent(); + if (tagsSent == null) + tagsSent = new HashSet(); try { // this should not block! boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent); @@ -47,6 +51,13 @@ class PacketQueue { packet.setKeyUsed(keyUsed); packet.setTagsSent(tagsSent); packet.incrementSends(); + if (_log.shouldLog(Log.DEBUG)) { + String msg = packet + " sent" + (tagsSent.size() > 0 + ? " with " + tagsSent.size() + " tags" + : "") + + " send # " + packet.getNumSends(); + _log.debug(msg); + } } } catch (I2PSessionException ise) { if (_log.shouldLog(Log.WARN)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java index c52f6c4f5..215a9e5e2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java @@ -28,8 +28,8 @@ class SchedulerChooser { for (int i = 0; i < _schedulers.size(); i++) { TaskScheduler scheduler = (TaskScheduler)_schedulers.get(i); if (scheduler.accept(con)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Scheduling for " + con + " with " + scheduler.getClass().getName()); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Scheduling for " + con + " with " + scheduler.getClass().getName()); return scheduler; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java index b6fbca2eb..5b6740849 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java @@ -28,8 +28,8 @@ abstract class SchedulerImpl implements TaskScheduler { _addedBy = new Exception("added by"); } public void timeReached() { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("firing event on " + _connection, _addedBy); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("firing event on " + _connection, _addedBy); _connection.eventOccurred(); } } diff --git a/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java b/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java new file mode 100644 index 000000000..a367f3995 --- /dev/null +++ b/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java @@ -0,0 +1,56 @@ +package net.i2p.client.streaming; + +/** + * + */ +public class StreamSinkTest { + public static void main(String args[]) { + System.setProperty(I2PSocketManagerFactory.PROP_MANAGER, I2PSocketManagerFull.class.getName()); + + new Thread(new Runnable() { + public void run() { + StreamSinkServer.main(new String[] { "streamSinkTestDir", "streamSinkTestServer.key" }); + } + }, "server").start(); + + try { Thread.sleep(30*1000); } catch (Exception e) {} + + //run(256, 10000); + //run(256, 1000); + //run(1024, 10); + run(32*1024, 1); + //run("/home/jrandom/streamSinkTestDir/clientSink36766.dat", 1); + //run(512*1024, 1); + try { Thread.sleep(10*1000); } catch (InterruptedException e) {} + System.out.println("Shutting down"); + System.exit(0); + } + + private static void run(final int kb, final int msBetweenWrites) { + Thread t = new Thread(new Runnable() { + public void run() { + StreamSinkClient.main(new String[] { kb+"", msBetweenWrites+"", "streamSinkTestServer.key" }); + } + }); + t.start(); + + System.out.println("client and server started: size = " + kb + "KB, delay = " + msBetweenWrites); + try { + t.join(); + } catch (InterruptedException ie) {} + } + + private static void run(final String filename, final int msBetweenWrites) { + Thread t = new Thread(new Runnable() { + public void run() { + StreamSinkSend.main(new String[] { filename, msBetweenWrites+"", "streamSinkTestServer.key" }); + } + }); + t.start(); + + System.out.println("client and server started: file " + filename + ", delay = " + msBetweenWrites); + try { + t.join(); + } catch (InterruptedException ie) {} + } +}