diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java
index beee90f38..7ad9de771 100644
--- a/router/java/src/net/i2p/router/JobQueue.java
+++ b/router/java/src/net/i2p/router/JobQueue.java
@@ -617,6 +617,7 @@ public class JobQueue {
buf.append(j.toString()).append("\n");
}
buf.append("\n");
+ out.flush();
buf.append("# timed jobs: ").append(timedJobs.size()).append("
\n");
TreeMap ordered = new TreeMap();
diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java
index 4d76ed20e..27bdf1523 100644
--- a/router/java/src/net/i2p/router/OutNetMessage.java
+++ b/router/java/src/net/i2p/router/OutNetMessage.java
@@ -21,6 +21,7 @@ import java.util.Set;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
+import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.util.Log;
@@ -34,6 +35,7 @@ public class OutNetMessage {
private Log _log;
private RouterContext _context;
private RouterInfo _target;
+ private Hash _targetHash;
private I2NPMessage _message;
/** cached message class name, for use after we discard the message */
private String _messageType;
@@ -121,6 +123,8 @@ public class OutNetMessage {
*/
public RouterInfo getTarget() { return _target; }
public void setTarget(RouterInfo target) { _target = target; }
+ public Hash getTargetHash() { return _targetHash; }
+ public void setTargetHash(Hash target) { _targetHash = target; }
/**
* Specifies the message to be sent
*
diff --git a/router/java/src/net/i2p/router/OutNetMessagePool.java b/router/java/src/net/i2p/router/OutNetMessagePool.java
index 389c0636f..6bff628b6 100644
--- a/router/java/src/net/i2p/router/OutNetMessagePool.java
+++ b/router/java/src/net/i2p/router/OutNetMessagePool.java
@@ -49,7 +49,10 @@ public class OutNetMessagePool {
+ " of type " + msg.getMessageType());
boolean valid = validate(msg);
- if (!valid) return;
+ if (!valid) {
+ _context.messageRegistry().unregisterPending(msg);
+ return;
+ }
MessageSelector selector = msg.getReplySelector();
if (selector != null) {
_context.messageRegistry().registerPending(msg);
diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java
index 7b6c009c3..cf7291c90 100644
--- a/router/java/src/net/i2p/router/Router.java
+++ b/router/java/src/net/i2p/router/Router.java
@@ -501,6 +501,7 @@ public class Router {
}
buf.append("\n");
out.write(buf.toString());
+ out.flush();
}
private static int MAX_MSG_LENGTH = 120;
diff --git a/router/java/src/net/i2p/router/Shitlist.java b/router/java/src/net/i2p/router/Shitlist.java
index adca46ec7..f3cc0e1de 100644
--- a/router/java/src/net/i2p/router/Shitlist.java
+++ b/router/java/src/net/i2p/router/Shitlist.java
@@ -50,7 +50,10 @@ public class Shitlist {
return shitlistRouter(peer, null);
}
public boolean shitlistRouter(Hash peer, String reason) {
- if (peer == null) return false;
+ if (peer == null) {
+ _log.error("wtf, why did we try to shitlist null?", new Exception("shitfaced"));
+ return false;
+ }
if (_context.routerHash().equals(peer)) {
_log.error("wtf, why did we try to shitlist ourselves?", new Exception("shitfaced"));
return false;
@@ -71,6 +74,7 @@ public class Shitlist {
//_context.netDb().fail(peer);
_context.tunnelManager().peerFailed(peer);
+ _context.messageRegistry().peerFailed(peer);
return wasAlready;
}
@@ -153,5 +157,6 @@ public class Shitlist {
}
buf.append("\n");
out.write(buf.toString());
+ out.flush();
}
}
diff --git a/router/java/src/net/i2p/router/admin/StatsGenerator.java b/router/java/src/net/i2p/router/admin/StatsGenerator.java
index e7bdfd719..150c1c644 100644
--- a/router/java/src/net/i2p/router/admin/StatsGenerator.java
+++ b/router/java/src/net/i2p/router/admin/StatsGenerator.java
@@ -92,6 +92,7 @@ public class StatsGenerator {
}
out.write("
");
}
+ out.flush();
}
private void renderFrequency(String name, StringBuffer buf) {
diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java
index abe32939d..c677b0a9a 100644
--- a/router/java/src/net/i2p/router/client/ClientManager.java
+++ b/router/java/src/net/i2p/router/client/ClientManager.java
@@ -377,6 +377,7 @@ public class ClientManager {
buf.append("\n
\n");
out.write(buf.toString());
+ out.flush();
}
public void messageReceived(ClientMessage msg) {
diff --git a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java
index 549044885..dc17e5cdd 100644
--- a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java
+++ b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java
@@ -559,11 +559,31 @@ public class SendTunnelMessageJob extends JobImpl {
outM.setOnSendJob(_onSend);
outM.setPriority(_priority);
outM.setReplySelector(_selector);
- outM.setTarget(null);
+ if (_destRouter != null)
+ outM.setTargetHash(_destRouter);
+ else
+ outM.setTargetHash(getContext().routerHash());
getContext().messageRegistry().registerPending(outM);
+ _onFailure = new FakeOnFailJob(getContext(), outM, _onFailure);
// we dont really need the data
outM.discardData();
}
+ private class FakeOnFailJob extends JobImpl {
+ private OutNetMessage _fakeMessage;
+ private Job _realOnFailJob;
+ public FakeOnFailJob(RouterContext ctx, OutNetMessage msg, Job realOnFailJob) {
+ super(ctx);
+ _fakeMessage = msg;
+ _realOnFailJob = realOnFailJob;
+ }
+ public String getName() { return "Fake message failure job"; }
+ public void runJob() {
+ getContext().messageRegistry().unregisterPending(_fakeMessage);
+ if (_realOnFailJob != null)
+ getContext().jobQueue().addJob(_realOnFailJob);
+ }
+ }
+
public String getName() { return "Send Tunnel Message"; }
}
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
index 52357e2ea..db710c94a 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
@@ -816,6 +816,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (!_initialized) {
buf.append("Not initialized\n");
out.write(buf.toString());
+ out.flush();
return;
}
Set leases = getLeases();
@@ -896,6 +897,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
buf.append("\n");
}
out.write(buf.toString());
+ out.flush();
}
private void renderRouterInfo(StringBuffer buf, RouterInfo info, boolean isUs) {
diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizerRenderer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizerRenderer.java
index 909d183d4..19fbc88a1 100644
--- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizerRenderer.java
+++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizerRenderer.java
@@ -127,6 +127,7 @@ class ProfileOrganizerRenderer {
buf.append("Capacity: ").append(num(_organizer.getCapacityThreshold())).append(" (").append(reliable).append(" high capacity peers)
");
buf.append("Integration: ").append(num(_organizer.getIntegrationThreshold())).append(" (").append(integrated).append(" well integrated peers)
");
out.write(buf.toString());
+ out.flush();
}
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));
diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java
index 17d0942ec..400dbfd29 100644
--- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java
+++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java
@@ -437,6 +437,7 @@ public class FIFOBandwidthLimiter {
}
buf.append("
\n");
out.write(buf.toString());
+ out.flush();
}
private static long __requestId = 0;
diff --git a/router/java/src/net/i2p/router/transport/GetBidsJob.java b/router/java/src/net/i2p/router/transport/GetBidsJob.java
index c079c9283..827c1cf48 100644
--- a/router/java/src/net/i2p/router/transport/GetBidsJob.java
+++ b/router/java/src/net/i2p/router/transport/GetBidsJob.java
@@ -54,6 +54,11 @@ public class GetBidsJob extends JobImpl {
List bids = _facade.getBids(_msg);
if (bids.size() <= 0) {
_log.warn("No bids available for the message " + _msg);
+ Hash target = _msg.getTargetHash();
+ if (target == null)
+ target = _msg.getTarget().getIdentity().getHash();
+ getContext().shitlist().shitlistRouter(target, "No bids");
+ getContext().netDb().fail(target);
fail();
} else {
TransportBid bid = (TransportBid)bids.get(0);
@@ -74,7 +79,10 @@ public class GetBidsJob extends JobImpl {
getContext().messageRegistry().unregisterPending(_msg);
}
- getContext().profileManager().messageFailed(_msg.getTarget().getIdentity().getHash());
+ if (_msg.getTargetHash() != null)
+ getContext().profileManager().messageFailed(_msg.getTargetHash());
+ else
+ getContext().profileManager().messageFailed(_msg.getTarget().getIdentity().getHash());
_msg.discardData();
}
diff --git a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java
index ccada8769..813c09328 100644
--- a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java
+++ b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java
@@ -17,6 +17,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import net.i2p.data.Hash;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
@@ -27,6 +28,7 @@ import net.i2p.util.Log;
public class OutboundMessageRegistry {
private Log _log;
+ /** Expiration date (Long) to OutNetMessage */
private TreeMap _pendingMessages;
private RouterContext _context;
@@ -274,6 +276,30 @@ public class OutboundMessageRegistry {
}
}
}
+
+ public void peerFailed(Hash peer) {
+ List failed = null;
+ synchronized (_pendingMessages) {
+ for (Iterator iter = _pendingMessages.values().iterator(); iter.hasNext(); ) {
+ OutNetMessage msg = (OutNetMessage)iter.next();
+ if ( (msg.getTargetHash() != null) && (msg.getTargetHash().equals(peer)) ) {
+ if (failed == null)
+ failed = new ArrayList(4);
+ failed.add(msg);
+ iter.remove();
+ }
+ }
+ }
+ if (failed != null) {
+ for (int i = 0; i < failed.size(); i++) {
+ OutNetMessage msg = (OutNetMessage)failed.get(i);
+ msg.discardData();
+ if (msg.getOnFailedSendJob() != null)
+ _context.jobQueue().addJob(msg.getOnFailedSendJob());
+ }
+ }
+
+ }
public void renderStatusHTML(Writer out) throws IOException {
StringBuffer buf = new StringBuffer(8192);
@@ -296,6 +322,7 @@ public class OutboundMessageRegistry {
}
buf.append("");
out.write(buf.toString());
+ out.flush();
}
/**
diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java
index 1b81b404e..56429b1b6 100644
--- a/router/java/src/net/i2p/router/transport/TransportImpl.java
+++ b/router/java/src/net/i2p/router/transport/TransportImpl.java
@@ -183,13 +183,13 @@ public abstract class TransportImpl implements Transport {
msg.discardData();
}
} else {
+ MessageSelector selector = msg.getReplySelector();
if (_log.shouldLog(Log.INFO))
_log.info("Failed and no requeue allowed for a "
+ msg.getMessageSize() + " byte "
- + msg.getMessageType() + " message");
+ + msg.getMessageType() + " message with selector " + selector, new Exception("fail cause"));
if (msg.getOnFailedSendJob() != null)
_context.jobQueue().addJob(msg.getOnFailedSendJob());
- MessageSelector selector = msg.getReplySelector();
if (selector != null)
_context.messageRegistry().unregisterPending(msg);
log = true;
diff --git a/router/java/src/net/i2p/router/transport/TransportManager.java b/router/java/src/net/i2p/router/transport/TransportManager.java
index be97c2eef..ba1f3719e 100644
--- a/router/java/src/net/i2p/router/transport/TransportManager.java
+++ b/router/java/src/net/i2p/router/transport/TransportManager.java
@@ -284,5 +284,6 @@ public class TransportManager implements TransportEventListener {
buf.append(str);
}
out.write(buf.toString());
+ out.flush();
}
}
diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java
index dde09f2c2..470f53ecf 100644
--- a/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java
+++ b/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java
@@ -140,6 +140,7 @@ public class ConnectionBuilder {
con.setSocket(_socket);
con.setRemoteRouterIdentity(_actualPeer.getIdentity());
con.setRemoteAddress(_remoteAddress);
+ con.setAttemptedPeer(_target.getIdentity().getHash());
if (_error == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Establishment successful! returning the con");
diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java
index e025198ae..e79b6619f 100644
--- a/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java
+++ b/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java
@@ -599,6 +599,11 @@ public class ConnectionHandler {
} else {
status = STATUS_OK;
}
+
+ if (_actualPeer.getIdentity().getHash().equals(_context.routerHash())) {
+ status = STATUS_UNKNOWN;
+ props.setProperty("REASON", "wtf, talking to myself?");
+ }
baos.write(status);
diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java
index e8a81d504..843ce2b04 100644
--- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java
+++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java
@@ -10,6 +10,7 @@ import java.util.ArrayList;
import java.util.List;
import net.i2p.data.DataHelper;
+import net.i2p.data.Hash;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessageReader;
@@ -25,6 +26,7 @@ public class TCPConnection {
private Log _log;
private RouterContext _context;
private RouterIdentity _ident;
+ private Hash _attemptedPeer;
private TCPAddress _remoteAddress;
private List _pendingMessages;
private InputStream _in;
@@ -55,10 +57,14 @@ public class TCPConnection {
public RouterIdentity getRemoteRouterIdentity() { return _ident; }
/** What is the peer's TCP address (using the IP address not hostname) */
public TCPAddress getRemoteAddress() { return _remoteAddress; }
+ /** Who we initially were trying to contact */
+ public Hash getAttemptedPeer() { return _attemptedPeer; }
/** Who are we talking with (or null if not identified) */
public void setRemoteRouterIdentity(RouterIdentity ident) { _ident = ident; }
/** What is the peer's TCP address (using the IP address not hostname) */
public void setRemoteAddress(TCPAddress addr) { _remoteAddress = addr; }
+ /** Who we initially were trying to contact */
+ public void setAttemptedPeer(Hash peer) { _attemptedPeer = peer; }
/**
* Actually start processing the messages on the connection (and reading
@@ -80,10 +86,21 @@ public class TCPConnection {
*
*/
public synchronized void closeConnection() {
- if (_log.shouldLog(Log.INFO))
- _log.info("Connection closed", new Exception("Closed by"));
+ if (_log.shouldLog(Log.INFO)) {
+ if (_ident != null)
+ _log.info("Connection between " + _ident.getHash().toBase64().substring(0,6)
+ + " and " + _context.routerHash().toBase64().substring(0,6)
+ + " closed", new Exception("Closed by"));
+ else
+ _log.info("Connection between " + _remoteAddress
+ + " and " + _context.routerHash().toBase64().substring(0,6)
+ + " closed", new Exception("Closed by"));
+ }
if (_closed) return;
_closed = true;
+ synchronized (_pendingMessages) {
+ _pendingMessages.notifyAll();
+ }
if (_runner != null)
_runner.stopRunning();
if (_reader != null)
@@ -112,6 +129,7 @@ public class TCPConnection {
synchronized (_pendingMessages) {
rv = new ArrayList(_pendingMessages);
_pendingMessages.clear();
+ _pendingMessages.notifyAll();
}
return rv;
}
@@ -172,7 +190,7 @@ public class TCPConnection {
}
/** How long has this connection been active for? */
- public long getLifetime() { return _context.clock().now() - _started; }
+ public long getLifetime() { return (_started <= 0 ? -1 : _context.clock().now() - _started); }
void setTransport(TCPTransport transport) { _transport = transport; }
diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPListener.java b/router/java/src/net/i2p/router/transport/tcp/TCPListener.java
index 237d6217b..a9c86b923 100644
--- a/router/java/src/net/i2p/router/transport/tcp/TCPListener.java
+++ b/router/java/src/net/i2p/router/transport/tcp/TCPListener.java
@@ -309,8 +309,8 @@ class TCPListener {
*/
public void timeReached() {
if (wasSuccessful()) {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Handle successful");
+ //if (_log.shouldLog(Log.DEBUG))
+ // _log.debug("Handle successful");
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to handle in the time allotted");
diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
index 51d158d87..8104eda6f 100644
--- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
+++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
@@ -174,9 +174,17 @@ public class TCPTransport extends TransportImpl {
Hash peer = msg.getTarget().getIdentity().calculateHash();
con = (TCPConnection)_connectionsByIdent.get(peer);
if (con == null) {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("No connections to " + peer.toBase64()
- + ", request one");
+ if (_log.shouldLog(Log.DEBUG)) {
+ StringBuffer buf = new StringBuffer(128);
+ buf.append("No connections to ");
+ buf.append(peer.toBase64().substring(0,6));
+ buf.append(", but we are connected to ");
+ for (Iterator iter = _connectionsByIdent.keySet().iterator(); iter.hasNext(); ) {
+ Hash cur = (Hash)iter.next();
+ buf.append(cur.toBase64().substring(0,6)).append(", ");
+ }
+ _log.debug(buf.toString());
+ }
List msgs = (List)_pendingMessages.get(peer);
if (msgs == null) {
msgs = new ArrayList(4);
@@ -208,60 +216,112 @@ public class TCPTransport extends TransportImpl {
}
List waitingMsgs = null;
- List oldCons = null;
+ List changedMsgs = null;
+ boolean alreadyConnected = false;
+ boolean changedIdents = false;
synchronized (_connectionLock) {
if (_connectionsByAddress.containsKey(remAddr.toString())) {
- if (oldCons == null)
- oldCons = new ArrayList(1);
- oldCons.add(_connectionsByAddress.remove(remAddr.toString()));
+ alreadyConnected = true;
+ } else {
+ _connectionsByAddress.put(remAddr.toString(), con);
}
- _connectionsByAddress.put(remAddr.toString(), con);
if (_connectionsByIdent.containsKey(ident.calculateHash())) {
- if (oldCons == null)
- oldCons = new ArrayList(1);
- oldCons.add(_connectionsByIdent.remove(ident.calculateHash()));
+ alreadyConnected = true;
+ } else {
+ _connectionsByIdent.put(ident.calculateHash(), con);
}
- _connectionsByIdent.put(ident.calculateHash(), con);
// just drop the _pending connections - the establisher should fail
// them accordingly.
_pendingConnectionsByAddress.remove(remAddr.toString());
_pendingConnectionsByIdent.remove(ident.calculateHash());
+ if ( (con.getAttemptedPeer() != null) && (!ident.getHash().equals(con.getAttemptedPeer())) ) {
+ changedIdents = true;
+ _pendingConnectionsByIdent.remove(con.getAttemptedPeer());
+ changedMsgs = (List)_pendingMessages.remove(con.getAttemptedPeer());
+ }
- waitingMsgs = (List)_pendingMessages.remove(ident.calculateHash());
- }
-
- // close any old connections, moving any queued messages to the new one
- if (oldCons != null) {
- for (int i = 0; i < oldCons.size(); i++) {
- TCPConnection cur = (TCPConnection)oldCons.get(i);
- List msgs = cur.clearPendingMessages();
- for (int j = 0; j < msgs.size(); j++) {
- con.addMessage((OutNetMessage)msgs.get(j));
+ if (!alreadyConnected)
+ waitingMsgs = (List)_pendingMessages.remove(ident.calculateHash());
+
+ if (_log.shouldLog(Log.DEBUG)) {
+ StringBuffer buf = new StringBuffer(256);
+ buf.append("\nConnection to ").append(ident.getHash().toBase64().substring(0,6));
+ buf.append(" built. Already connected? ");
+ buf.append(alreadyConnected);
+ buf.append("\nconnectionsByAddress: (cur=").append(remAddr.toString()).append(") ");
+ for (Iterator iter = _connectionsByAddress.keySet().iterator(); iter.hasNext(); ) {
+ String addr = (String)iter.next();
+ buf.append(addr).append(" ");
}
- cur.closeConnection();
+ buf.append("\nconnectionsByIdent: ");
+ for (Iterator iter = _connectionsByIdent.keySet().iterator(); iter.hasNext(); ) {
+ Hash h = (Hash)iter.next();
+ buf.append(h.toBase64().substring(0,6)).append(" ");
+ }
+
+ _log.debug(buf.toString());
}
}
- if (waitingMsgs != null) {
- for (int i = 0; i < waitingMsgs.size(); i++) {
- con.addMessage((OutNetMessage)waitingMsgs.get(i));
+ if (changedIdents) {
+ _context.shitlist().shitlistRouter(con.getAttemptedPeer(), "Changed identities");
+ if (changedMsgs != null) {
+ for (int i = 0; i < changedMsgs.size(); i++) {
+ afterSend((OutNetMessage)changedMsgs.get(i), false, false, 0);
+ }
}
}
- _context.shitlist().unshitlistRouter(ident.calculateHash());
+ if (alreadyConnected) {
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Closing new duplicate");
+ con.setTransport(this);
+ con.closeConnection();
+ } else {
- con.setTransport(this);
- con.runConnection();
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Connection set to run");
+ if (waitingMsgs != null) {
+ for (int i = 0; i < waitingMsgs.size(); i++) {
+ con.addMessage((OutNetMessage)waitingMsgs.get(i));
+ }
+ }
+
+ _context.shitlist().unshitlistRouter(ident.calculateHash());
+
+ con.setTransport(this);
+ con.runConnection();
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Connection set to run");
+ }
}
void connectionClosed(TCPConnection con) {
synchronized (_connectionLock) {
- _connectionsByIdent.remove(con.getRemoteRouterIdentity().getHash());
- _connectionsByAddress.remove(con.getRemoteAddress().toString());
+ TCPConnection cur = (TCPConnection)_connectionsByIdent.remove(con.getRemoteRouterIdentity().getHash());
+ if (cur != con)
+ _connectionsByIdent.put(cur.getRemoteRouterIdentity().getHash(), cur);
+ cur = (TCPConnection)_connectionsByAddress.remove(con.getRemoteAddress().toString());
+ if (cur != con)
+ _connectionsByAddress.put(cur.getRemoteAddress().toString(), cur);
+
+ if (_log.shouldLog(Log.DEBUG)) {
+ StringBuffer buf = new StringBuffer(256);
+ buf.append("\nCLOSING ").append(con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6));
+ buf.append(".");
+ buf.append("\nconnectionsByAddress: (cur=").append(con.getRemoteAddress().toString()).append(") ");
+ for (Iterator iter = _connectionsByAddress.keySet().iterator(); iter.hasNext(); ) {
+ String addr = (String)iter.next();
+ buf.append(addr).append(" ");
+ }
+ buf.append("\nconnectionsByIdent: ");
+ for (Iterator iter = _connectionsByIdent.keySet().iterator(); iter.hasNext(); ) {
+ Hash h = (Hash)iter.next();
+ buf.append(h.toBase64().substring(0,6)).append(" ");
+ }
+
+ _log.debug(buf.toString(), new Exception("Closed by"));
+ }
}
}
@@ -407,9 +467,15 @@ public class TCPTransport extends TransportImpl {
*
*/
private void updateAddress(TCPAddress addr) {
+ boolean restartListener = true;
+ if ( (addr.getPort() == getPort()) && (shouldListenToAllInterfaces()) )
+ restartListener = false;
+
RouterAddress routerAddr = addr.toRouterAddress();
_myAddress = addr;
- _listener.stopListening();
+
+ if (restartListener)
+ _listener.stopListening();
replaceAddress(routerAddr);
@@ -420,6 +486,7 @@ public class TCPTransport extends TransportImpl {
+ " and modified our routerInfo to have: "
+ _context.router().getRouterInfo().getAddresses());
+ // safe to do multiple times
_listener.startListening();
}
@@ -509,17 +576,71 @@ public class TCPTransport extends TransportImpl {
if (addr == null) {
_log.error("Message target has no TCP addresses! " + msg.getTarget());
iter.remove();
+ _context.shitlist().shitlistRouter(peer, "Peer "
+ + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6)
+ + " has no addresses");
+ _context.netDb().fail(peer);
+ for (int i = 0; i < msgs.size(); i++) {
+ OutNetMessage cur = (OutNetMessage)msgs.get(i);
+ afterSend(cur, false, false, 0);
+ }
continue;
}
TCPAddress tcpAddr = new TCPAddress(addr);
- if (tcpAddr.getPort() <= 0)
+ if (tcpAddr.getPort() <= 0) {
+ iter.remove();
+ _context.shitlist().shitlistRouter(peer, "Peer "
+ + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6)
+ + " has only invalid addresses");
+ _context.netDb().fail(peer);
+ for (int i = 0; i < msgs.size(); i++) {
+ OutNetMessage cur = (OutNetMessage)msgs.get(i);
+ afterSend(cur, false, false, 0);
+ }
continue; // invalid
+ }
if (_pendingConnectionsByAddress.contains(tcpAddr.toString()))
continue; // we're already trying to talk to someone at their address
+
+ if (_context.routerHash().equals(peer)) {
+ _log.error("Message points at us! " + msg.getTarget());
+ iter.remove();
+ _context.netDb().fail(peer);
+ for (int i = 0; i < msgs.size(); i++) {
+ OutNetMessage cur = (OutNetMessage)msgs.get(i);
+ afterSend(cur, false, false, 0);
+ }
+ continue;
+ }
+ if ( (_myAddress != null) && (_myAddress.equals(tcpAddr)) ) {
+ _log.error("Message points at our old TCP addresses! " + msg.getTarget());
+ iter.remove();
+ _context.shitlist().shitlistRouter(peer, "This is our old address...");
+ _context.netDb().fail(peer);
+ for (int i = 0; i < msgs.size(); i++) {
+ OutNetMessage cur = (OutNetMessage)msgs.get(i);
+ afterSend(cur, false, false, 0);
+ }
+ continue;
+ }
+ if (!allowAddress(tcpAddr)) {
+ _log.error("Message points at illegal address! " + msg.getTarget());
+
+ iter.remove();
+ _context.shitlist().shitlistRouter(peer, "Invalid addressaddress...");
+ _context.netDb().fail(peer);
+ for (int i = 0; i < msgs.size(); i++) {
+ OutNetMessage cur = (OutNetMessage)msgs.get(i);
+ afterSend(cur, false, false, 0);
+ }
+ continue;
+ }
// ok, this is someone we can try to contact. mark it as ours.
_pendingConnectionsByIdent.add(peer);
_pendingConnectionsByAddress.add(tcpAddr.toString());
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Add pending connection to: " + peer.toBase64().substring(0,6));
return msg.getTarget();
}
diff --git a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java
index e087f6417..9c8762d41 100644
--- a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java
+++ b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java
@@ -671,6 +671,7 @@ class TunnelPool {
ClientTunnelPool pool = getClientPool(dest);
renderTunnels(out, buf, "Inbound tunnels for " + dest.calculateHash() + " - (still connected? " + (!pool.isStopped()) + ")", pool.getInboundTunnelIds());
}
+ out.flush();
}
private void renderTunnels(Writer out, StringBuffer buf, String msg, Set tunnelIds) throws IOException {