* always flush the console output (duh)

* deal with some degenerate situations with identities changing and auto IP detection
* some catch-alls for cleaning up the registry on degenerate tunnels
* lots of logging
This commit is contained in:
jrandom
2004-09-30 13:10:02 +00:00
committed by zzz
parent 6eb7ecc2d4
commit 6804a0c564
21 changed files with 268 additions and 46 deletions

View File

@ -617,6 +617,7 @@ public class JobQueue {
buf.append(j.toString()).append("</li>\n");
}
buf.append("</ol>\n");
out.flush();
buf.append("# timed jobs: ").append(timedJobs.size()).append("<ol>\n");
TreeMap ordered = new TreeMap();

View File

@ -21,6 +21,7 @@ import java.util.Set;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.util.Log;
@ -34,6 +35,7 @@ public class OutNetMessage {
private Log _log;
private RouterContext _context;
private RouterInfo _target;
private Hash _targetHash;
private I2NPMessage _message;
/** cached message class name, for use after we discard the message */
private String _messageType;
@ -121,6 +123,8 @@ public class OutNetMessage {
*/
public RouterInfo getTarget() { return _target; }
public void setTarget(RouterInfo target) { _target = target; }
public Hash getTargetHash() { return _targetHash; }
public void setTargetHash(Hash target) { _targetHash = target; }
/**
* Specifies the message to be sent
*

View File

@ -49,7 +49,10 @@ public class OutNetMessagePool {
+ " of type " + msg.getMessageType());
boolean valid = validate(msg);
if (!valid) return;
if (!valid) {
_context.messageRegistry().unregisterPending(msg);
return;
}
MessageSelector selector = msg.getReplySelector();
if (selector != null) {
_context.messageRegistry().registerPending(msg);

View File

@ -501,6 +501,7 @@ public class Router {
}
buf.append("</table>\n");
out.write(buf.toString());
out.flush();
}
private static int MAX_MSG_LENGTH = 120;

View File

@ -50,7 +50,10 @@ public class Shitlist {
return shitlistRouter(peer, null);
}
public boolean shitlistRouter(Hash peer, String reason) {
if (peer == null) return false;
if (peer == null) {
_log.error("wtf, why did we try to shitlist null?", new Exception("shitfaced"));
return false;
}
if (_context.routerHash().equals(peer)) {
_log.error("wtf, why did we try to shitlist ourselves?", new Exception("shitfaced"));
return false;
@ -71,6 +74,7 @@ public class Shitlist {
//_context.netDb().fail(peer);
_context.tunnelManager().peerFailed(peer);
_context.messageRegistry().peerFailed(peer);
return wasAlready;
}
@ -153,5 +157,6 @@ public class Shitlist {
}
buf.append("</ul>\n");
out.write(buf.toString());
out.flush();
}
}

View File

@ -92,6 +92,7 @@ public class StatsGenerator {
}
out.write("</ul><hr />");
}
out.flush();
}
private void renderFrequency(String name, StringBuffer buf) {

View File

@ -377,6 +377,7 @@ public class ClientManager {
buf.append("\n<hr />\n");
out.write(buf.toString());
out.flush();
}
public void messageReceived(ClientMessage msg) {

View File

@ -559,11 +559,31 @@ public class SendTunnelMessageJob extends JobImpl {
outM.setOnSendJob(_onSend);
outM.setPriority(_priority);
outM.setReplySelector(_selector);
outM.setTarget(null);
if (_destRouter != null)
outM.setTargetHash(_destRouter);
else
outM.setTargetHash(getContext().routerHash());
getContext().messageRegistry().registerPending(outM);
_onFailure = new FakeOnFailJob(getContext(), outM, _onFailure);
// we dont really need the data
outM.discardData();
}
private class FakeOnFailJob extends JobImpl {
private OutNetMessage _fakeMessage;
private Job _realOnFailJob;
public FakeOnFailJob(RouterContext ctx, OutNetMessage msg, Job realOnFailJob) {
super(ctx);
_fakeMessage = msg;
_realOnFailJob = realOnFailJob;
}
public String getName() { return "Fake message failure job"; }
public void runJob() {
getContext().messageRegistry().unregisterPending(_fakeMessage);
if (_realOnFailJob != null)
getContext().jobQueue().addJob(_realOnFailJob);
}
}
public String getName() { return "Send Tunnel Message"; }
}

View File

@ -816,6 +816,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (!_initialized) {
buf.append("<i>Not initialized</i>\n");
out.write(buf.toString());
out.flush();
return;
}
Set leases = getLeases();
@ -896,6 +897,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
buf.append("</table>\n");
}
out.write(buf.toString());
out.flush();
}
private void renderRouterInfo(StringBuffer buf, RouterInfo info, boolean isUs) {

View File

@ -127,6 +127,7 @@ class ProfileOrganizerRenderer {
buf.append("<b>Capacity:</b> ").append(num(_organizer.getCapacityThreshold())).append(" (").append(reliable).append(" high capacity peers)<br />");
buf.append("<b>Integration:</b> ").append(num(_organizer.getIntegrationThreshold())).append(" (").append(integrated).append(" well integrated peers)<br />");
out.write(buf.toString());
out.flush();
}
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));

View File

@ -437,6 +437,7 @@ public class FIFOBandwidthLimiter {
}
buf.append("</ol></li></ul>\n");
out.write(buf.toString());
out.flush();
}
private static long __requestId = 0;

View File

@ -54,6 +54,11 @@ public class GetBidsJob extends JobImpl {
List bids = _facade.getBids(_msg);
if (bids.size() <= 0) {
_log.warn("No bids available for the message " + _msg);
Hash target = _msg.getTargetHash();
if (target == null)
target = _msg.getTarget().getIdentity().getHash();
getContext().shitlist().shitlistRouter(target, "No bids");
getContext().netDb().fail(target);
fail();
} else {
TransportBid bid = (TransportBid)bids.get(0);
@ -74,6 +79,9 @@ public class GetBidsJob extends JobImpl {
getContext().messageRegistry().unregisterPending(_msg);
}
if (_msg.getTargetHash() != null)
getContext().profileManager().messageFailed(_msg.getTargetHash());
else
getContext().profileManager().messageFailed(_msg.getTarget().getIdentity().getHash());
_msg.discardData();

View File

@ -17,6 +17,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import net.i2p.data.Hash;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
@ -27,6 +28,7 @@ import net.i2p.util.Log;
public class OutboundMessageRegistry {
private Log _log;
/** Expiration date (Long) to OutNetMessage */
private TreeMap _pendingMessages;
private RouterContext _context;
@ -275,6 +277,30 @@ public class OutboundMessageRegistry {
}
}
public void peerFailed(Hash peer) {
List failed = null;
synchronized (_pendingMessages) {
for (Iterator iter = _pendingMessages.values().iterator(); iter.hasNext(); ) {
OutNetMessage msg = (OutNetMessage)iter.next();
if ( (msg.getTargetHash() != null) && (msg.getTargetHash().equals(peer)) ) {
if (failed == null)
failed = new ArrayList(4);
failed.add(msg);
iter.remove();
}
}
}
if (failed != null) {
for (int i = 0; i < failed.size(); i++) {
OutNetMessage msg = (OutNetMessage)failed.get(i);
msg.discardData();
if (msg.getOnFailedSendJob() != null)
_context.jobQueue().addJob(msg.getOnFailedSendJob());
}
}
}
public void renderStatusHTML(Writer out) throws IOException {
StringBuffer buf = new StringBuffer(8192);
buf.append("<h2>Pending messages</h2>\n");
@ -296,6 +322,7 @@ public class OutboundMessageRegistry {
}
buf.append("</ul>");
out.write(buf.toString());
out.flush();
}
/**

View File

@ -183,13 +183,13 @@ public abstract class TransportImpl implements Transport {
msg.discardData();
}
} else {
MessageSelector selector = msg.getReplySelector();
if (_log.shouldLog(Log.INFO))
_log.info("Failed and no requeue allowed for a "
+ msg.getMessageSize() + " byte "
+ msg.getMessageType() + " message");
+ msg.getMessageType() + " message with selector " + selector, new Exception("fail cause"));
if (msg.getOnFailedSendJob() != null)
_context.jobQueue().addJob(msg.getOnFailedSendJob());
MessageSelector selector = msg.getReplySelector();
if (selector != null)
_context.messageRegistry().unregisterPending(msg);
log = true;

View File

@ -284,5 +284,6 @@ public class TransportManager implements TransportEventListener {
buf.append(str);
}
out.write(buf.toString());
out.flush();
}
}

View File

@ -140,6 +140,7 @@ public class ConnectionBuilder {
con.setSocket(_socket);
con.setRemoteRouterIdentity(_actualPeer.getIdentity());
con.setRemoteAddress(_remoteAddress);
con.setAttemptedPeer(_target.getIdentity().getHash());
if (_error == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Establishment successful! returning the con");

View File

@ -600,6 +600,11 @@ public class ConnectionHandler {
status = STATUS_OK;
}
if (_actualPeer.getIdentity().getHash().equals(_context.routerHash())) {
status = STATUS_UNKNOWN;
props.setProperty("REASON", "wtf, talking to myself?");
}
baos.write(status);
DataHelper.writeProperties(baos, props);

View File

@ -10,6 +10,7 @@ import java.util.ArrayList;
import java.util.List;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessageReader;
@ -25,6 +26,7 @@ public class TCPConnection {
private Log _log;
private RouterContext _context;
private RouterIdentity _ident;
private Hash _attemptedPeer;
private TCPAddress _remoteAddress;
private List _pendingMessages;
private InputStream _in;
@ -55,10 +57,14 @@ public class TCPConnection {
public RouterIdentity getRemoteRouterIdentity() { return _ident; }
/** What is the peer's TCP address (using the IP address not hostname) */
public TCPAddress getRemoteAddress() { return _remoteAddress; }
/** Who we initially were trying to contact */
public Hash getAttemptedPeer() { return _attemptedPeer; }
/** Who are we talking with (or null if not identified) */
public void setRemoteRouterIdentity(RouterIdentity ident) { _ident = ident; }
/** What is the peer's TCP address (using the IP address not hostname) */
public void setRemoteAddress(TCPAddress addr) { _remoteAddress = addr; }
/** Who we initially were trying to contact */
public void setAttemptedPeer(Hash peer) { _attemptedPeer = peer; }
/**
* Actually start processing the messages on the connection (and reading
@ -80,10 +86,21 @@ public class TCPConnection {
*
*/
public synchronized void closeConnection() {
if (_log.shouldLog(Log.INFO))
_log.info("Connection closed", new Exception("Closed by"));
if (_log.shouldLog(Log.INFO)) {
if (_ident != null)
_log.info("Connection between " + _ident.getHash().toBase64().substring(0,6)
+ " and " + _context.routerHash().toBase64().substring(0,6)
+ " closed", new Exception("Closed by"));
else
_log.info("Connection between " + _remoteAddress
+ " and " + _context.routerHash().toBase64().substring(0,6)
+ " closed", new Exception("Closed by"));
}
if (_closed) return;
_closed = true;
synchronized (_pendingMessages) {
_pendingMessages.notifyAll();
}
if (_runner != null)
_runner.stopRunning();
if (_reader != null)
@ -112,6 +129,7 @@ public class TCPConnection {
synchronized (_pendingMessages) {
rv = new ArrayList(_pendingMessages);
_pendingMessages.clear();
_pendingMessages.notifyAll();
}
return rv;
}
@ -172,7 +190,7 @@ public class TCPConnection {
}
/** How long has this connection been active for? */
public long getLifetime() { return _context.clock().now() - _started; }
public long getLifetime() { return (_started <= 0 ? -1 : _context.clock().now() - _started); }
void setTransport(TCPTransport transport) { _transport = transport; }

View File

@ -309,8 +309,8 @@ class TCPListener {
*/
public void timeReached() {
if (wasSuccessful()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle successful");
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Handle successful");
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to handle in the time allotted");

View File

@ -174,9 +174,17 @@ public class TCPTransport extends TransportImpl {
Hash peer = msg.getTarget().getIdentity().calculateHash();
con = (TCPConnection)_connectionsByIdent.get(peer);
if (con == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No connections to " + peer.toBase64()
+ ", request one");
if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(128);
buf.append("No connections to ");
buf.append(peer.toBase64().substring(0,6));
buf.append(", but we are connected to ");
for (Iterator iter = _connectionsByIdent.keySet().iterator(); iter.hasNext(); ) {
Hash cur = (Hash)iter.next();
buf.append(cur.toBase64().substring(0,6)).append(", ");
}
_log.debug(buf.toString());
}
List msgs = (List)_pendingMessages.get(peer);
if (msgs == null) {
msgs = new ArrayList(4);
@ -208,42 +216,71 @@ public class TCPTransport extends TransportImpl {
}
List waitingMsgs = null;
List oldCons = null;
List changedMsgs = null;
boolean alreadyConnected = false;
boolean changedIdents = false;
synchronized (_connectionLock) {
if (_connectionsByAddress.containsKey(remAddr.toString())) {
if (oldCons == null)
oldCons = new ArrayList(1);
oldCons.add(_connectionsByAddress.remove(remAddr.toString()));
}
alreadyConnected = true;
} else {
_connectionsByAddress.put(remAddr.toString(), con);
}
if (_connectionsByIdent.containsKey(ident.calculateHash())) {
if (oldCons == null)
oldCons = new ArrayList(1);
oldCons.add(_connectionsByIdent.remove(ident.calculateHash()));
}
alreadyConnected = true;
} else {
_connectionsByIdent.put(ident.calculateHash(), con);
}
// just drop the _pending connections - the establisher should fail
// them accordingly.
_pendingConnectionsByAddress.remove(remAddr.toString());
_pendingConnectionsByIdent.remove(ident.calculateHash());
if ( (con.getAttemptedPeer() != null) && (!ident.getHash().equals(con.getAttemptedPeer())) ) {
changedIdents = true;
_pendingConnectionsByIdent.remove(con.getAttemptedPeer());
changedMsgs = (List)_pendingMessages.remove(con.getAttemptedPeer());
}
if (!alreadyConnected)
waitingMsgs = (List)_pendingMessages.remove(ident.calculateHash());
if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(256);
buf.append("\nConnection to ").append(ident.getHash().toBase64().substring(0,6));
buf.append(" built. Already connected? ");
buf.append(alreadyConnected);
buf.append("\nconnectionsByAddress: (cur=").append(remAddr.toString()).append(") ");
for (Iterator iter = _connectionsByAddress.keySet().iterator(); iter.hasNext(); ) {
String addr = (String)iter.next();
buf.append(addr).append(" ");
}
buf.append("\nconnectionsByIdent: ");
for (Iterator iter = _connectionsByIdent.keySet().iterator(); iter.hasNext(); ) {
Hash h = (Hash)iter.next();
buf.append(h.toBase64().substring(0,6)).append(" ");
}
// close any old connections, moving any queued messages to the new one
if (oldCons != null) {
for (int i = 0; i < oldCons.size(); i++) {
TCPConnection cur = (TCPConnection)oldCons.get(i);
List msgs = cur.clearPendingMessages();
for (int j = 0; j < msgs.size(); j++) {
con.addMessage((OutNetMessage)msgs.get(j));
}
cur.closeConnection();
_log.debug(buf.toString());
}
}
if (changedIdents) {
_context.shitlist().shitlistRouter(con.getAttemptedPeer(), "Changed identities");
if (changedMsgs != null) {
for (int i = 0; i < changedMsgs.size(); i++) {
afterSend((OutNetMessage)changedMsgs.get(i), false, false, 0);
}
}
}
if (alreadyConnected) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Closing new duplicate");
con.setTransport(this);
con.closeConnection();
} else {
if (waitingMsgs != null) {
for (int i = 0; i < waitingMsgs.size(); i++) {
con.addMessage((OutNetMessage)waitingMsgs.get(i));
@ -257,11 +294,34 @@ public class TCPTransport extends TransportImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection set to run");
}
}
void connectionClosed(TCPConnection con) {
synchronized (_connectionLock) {
_connectionsByIdent.remove(con.getRemoteRouterIdentity().getHash());
_connectionsByAddress.remove(con.getRemoteAddress().toString());
TCPConnection cur = (TCPConnection)_connectionsByIdent.remove(con.getRemoteRouterIdentity().getHash());
if (cur != con)
_connectionsByIdent.put(cur.getRemoteRouterIdentity().getHash(), cur);
cur = (TCPConnection)_connectionsByAddress.remove(con.getRemoteAddress().toString());
if (cur != con)
_connectionsByAddress.put(cur.getRemoteAddress().toString(), cur);
if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(256);
buf.append("\nCLOSING ").append(con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6));
buf.append(".");
buf.append("\nconnectionsByAddress: (cur=").append(con.getRemoteAddress().toString()).append(") ");
for (Iterator iter = _connectionsByAddress.keySet().iterator(); iter.hasNext(); ) {
String addr = (String)iter.next();
buf.append(addr).append(" ");
}
buf.append("\nconnectionsByIdent: ");
for (Iterator iter = _connectionsByIdent.keySet().iterator(); iter.hasNext(); ) {
Hash h = (Hash)iter.next();
buf.append(h.toBase64().substring(0,6)).append(" ");
}
_log.debug(buf.toString(), new Exception("Closed by"));
}
}
}
@ -407,8 +467,14 @@ public class TCPTransport extends TransportImpl {
*
*/
private void updateAddress(TCPAddress addr) {
boolean restartListener = true;
if ( (addr.getPort() == getPort()) && (shouldListenToAllInterfaces()) )
restartListener = false;
RouterAddress routerAddr = addr.toRouterAddress();
_myAddress = addr;
if (restartListener)
_listener.stopListening();
replaceAddress(routerAddr);
@ -420,6 +486,7 @@ public class TCPTransport extends TransportImpl {
+ " and modified our routerInfo to have: "
+ _context.router().getRouterInfo().getAddresses());
// safe to do multiple times
_listener.startListening();
}
@ -509,17 +576,71 @@ public class TCPTransport extends TransportImpl {
if (addr == null) {
_log.error("Message target has no TCP addresses! " + msg.getTarget());
iter.remove();
_context.shitlist().shitlistRouter(peer, "Peer "
+ msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6)
+ " has no addresses");
_context.netDb().fail(peer);
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage cur = (OutNetMessage)msgs.get(i);
afterSend(cur, false, false, 0);
}
continue;
}
TCPAddress tcpAddr = new TCPAddress(addr);
if (tcpAddr.getPort() <= 0)
if (tcpAddr.getPort() <= 0) {
iter.remove();
_context.shitlist().shitlistRouter(peer, "Peer "
+ msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6)
+ " has only invalid addresses");
_context.netDb().fail(peer);
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage cur = (OutNetMessage)msgs.get(i);
afterSend(cur, false, false, 0);
}
continue; // invalid
}
if (_pendingConnectionsByAddress.contains(tcpAddr.toString()))
continue; // we're already trying to talk to someone at their address
if (_context.routerHash().equals(peer)) {
_log.error("Message points at us! " + msg.getTarget());
iter.remove();
_context.netDb().fail(peer);
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage cur = (OutNetMessage)msgs.get(i);
afterSend(cur, false, false, 0);
}
continue;
}
if ( (_myAddress != null) && (_myAddress.equals(tcpAddr)) ) {
_log.error("Message points at our old TCP addresses! " + msg.getTarget());
iter.remove();
_context.shitlist().shitlistRouter(peer, "This is our old address...");
_context.netDb().fail(peer);
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage cur = (OutNetMessage)msgs.get(i);
afterSend(cur, false, false, 0);
}
continue;
}
if (!allowAddress(tcpAddr)) {
_log.error("Message points at illegal address! " + msg.getTarget());
iter.remove();
_context.shitlist().shitlistRouter(peer, "Invalid addressaddress...");
_context.netDb().fail(peer);
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage cur = (OutNetMessage)msgs.get(i);
afterSend(cur, false, false, 0);
}
continue;
}
// ok, this is someone we can try to contact. mark it as ours.
_pendingConnectionsByIdent.add(peer);
_pendingConnectionsByAddress.add(tcpAddr.toString());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add pending connection to: " + peer.toBase64().substring(0,6));
return msg.getTarget();
}

View File

@ -671,6 +671,7 @@ class TunnelPool {
ClientTunnelPool pool = getClientPool(dest);
renderTunnels(out, buf, "Inbound tunnels for " + dest.calculateHash() + " - (still connected? " + (!pool.isStopped()) + ")", pool.getInboundTunnelIds());
}
out.flush();
}
private void renderTunnels(Writer out, StringBuffer buf, String msg, Set tunnelIds) throws IOException {