This commit is contained in:
jrandom
2004-06-27 19:39:45 +00:00
committed by zzz
parent f312318fab
commit 5c1e001a73
5 changed files with 93 additions and 19 deletions

View File

@ -226,6 +226,8 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
Destination dest = I2PTunnel.destFromName(destination); Destination dest = I2PTunnel.destFromName(destination);
if (dest == null) { if (dest == null) {
l.log("Could not resolve " + destination + "."); l.log("Could not resolve " + destination + ".");
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to resolve " + destination + " (proxy? " + usingWWWProxy + ", request: " + targetRequest);
writeErrorMessage(ERR_DESTINATION_UNKNOWN, out, targetRequest, usingWWWProxy, destination); writeErrorMessage(ERR_DESTINATION_UNKNOWN, out, targetRequest, usingWWWProxy, destination);
s.close(); s.close();
return; return;
@ -365,6 +367,9 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
private void handleHTTPClientException(Exception ex, OutputStream out, String targetRequest, private void handleHTTPClientException(Exception ex, OutputStream out, String targetRequest,
boolean usingWWWProxy, String wwwProxy) { boolean usingWWWProxy, String wwwProxy) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error sending to " + wwwProxy + " (proxy? " + usingWWWProxy + ", request: " + targetRequest, ex);
if (out != null) { if (out != null) {
try { try {
writeErrorMessage(ERR_DESTINATION_UNKNOWN, out, targetRequest, usingWWWProxy, wwwProxy); writeErrorMessage(ERR_DESTINATION_UNKNOWN, out, targetRequest, usingWWWProxy, wwwProxy);

View File

@ -131,8 +131,11 @@ class I2PSocketImpl implements I2PSocket {
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
} }
if ((maxWait >= 0) && (System.currentTimeMillis() >= dieAfter)) long now = System.currentTimeMillis();
throw new InterruptedIOException("Timed out waiting for remote ID"); if ((maxWait >= 0) && (now >= dieAfter)) {
long waitedExcess = now - dieAfter;
throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess + "ms too long [" + maxWait + "ms])");
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("TIMING: RemoteID set to " _log.debug("TIMING: RemoteID set to "

View File

@ -419,17 +419,25 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
void sendMessage(I2CPMessage message) throws I2PSessionException { void sendMessage(I2CPMessage message) throws I2PSessionException {
if (isClosed()) throw new I2PSessionException("Already closed"); if (isClosed()) throw new I2PSessionException("Already closed");
long beforeSync = _context.clock().now();
long inSync = 0;
try { try {
synchronized (_out) { synchronized (_out) {
inSync = _context.clock().now();
message.writeMessage(_out); message.writeMessage(_out);
_out.flush(); _out.flush();
} }
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Message written out and flushed");
} catch (I2CPMessageException ime) { } catch (I2CPMessageException ime) {
throw new I2PSessionException(getPrefix() + "Error writing out the message", ime); throw new I2PSessionException(getPrefix() + "Error writing out the message", ime);
} catch (IOException ioe) { } catch (IOException ioe) {
throw new I2PSessionException(getPrefix() + "Error writing out the message", ioe); throw new I2PSessionException(getPrefix() + "Error writing out the message", ioe);
} }
long afterSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Message written out and flushed w/ "
+ (inSync-beforeSync) + "ms to sync and "
+ (afterSync-inSync) + "ms to send");;
} }
/** /**

View File

@ -62,7 +62,6 @@ class I2PSessionImpl2 extends I2PSessionImpl {
} }
public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException { public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException {
return sendMessage(dest, payload, new SessionKey(), new HashSet(64)); return sendMessage(dest, payload, new SessionKey(), new HashSet(64));
} }
@ -137,18 +136,29 @@ class I2PSessionImpl2 extends I2PSessionImpl {
} }
} }
long beforeSendingSync = _context.clock().now();
long inSendingSync = 0;
synchronized (_sendingStates) { synchronized (_sendingStates) {
inSendingSync = _context.clock().now();
_sendingStates.add(state); _sendingStates.add(state);
} }
long afterSendingSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / " _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / "
+ state.getNonce()); + state.getNonce()
+ " sync took " + (inSendingSync-beforeSendingSync)
+ " add took " + (afterSendingSync-inSendingSync));
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
long beforeWaitFor = _context.clock().now();
state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED,
_context.clock().now() + getTimeout()); _context.clock().now() + getTimeout());
long afterWaitFor = _context.clock().now();
long inRemovingSync = 0;
synchronized (_sendingStates) { synchronized (_sendingStates) {
inRemovingSync = _context.clock().now();
_sendingStates.remove(state); _sendingStates.remove(state);
} }
long afterRemovingSync = _context.clock().now();
boolean found = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED); boolean found = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId().getMessageId() _log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId().getMessageId()
@ -206,22 +216,34 @@ class I2PSessionImpl2 extends I2PSessionImpl {
} }
} }
long beforeSendingSync = _context.clock().now();
long inSendingSync = 0;
synchronized (_sendingStates) { synchronized (_sendingStates) {
inSendingSync = _context.clock().now();
_sendingStates.add(state); _sendingStates.add(state);
} }
long afterSendingSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / " _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / "
+ state.getNonce()); + state.getNonce()
+ " sync took " + (inSendingSync-beforeSendingSync)
+ " add took " + (afterSendingSync-inSendingSync));
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
long beforeWaitFor = _context.clock().now();
if (isGuaranteed()) if (isGuaranteed())
state.waitFor(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS, state.waitFor(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS,
_context.clock().now() + SEND_TIMEOUT); _context.clock().now() + SEND_TIMEOUT);
else else
state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED,
_context.clock().now() + SEND_TIMEOUT); _context.clock().now() + SEND_TIMEOUT);
long afterWaitFor = _context.clock().now();
long inRemovingSync = 0;
synchronized (_sendingStates) { synchronized (_sendingStates) {
inRemovingSync = _context.clock().now();
_sendingStates.remove(state); _sendingStates.remove(state);
} }
long afterRemovingSync = _context.clock().now();
boolean found = state.received(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS); boolean found = state.received(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS);
boolean accepted = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED); boolean accepted = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
@ -229,7 +251,12 @@ class I2PSessionImpl2 extends I2PSessionImpl {
if (_log.shouldLog(Log.CRIT)) if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, getPrefix() + "State with nonce " + state.getNonce() _log.log(Log.CRIT, getPrefix() + "State with nonce " + state.getNonce()
+ " was not accepted? (no messageId!! found=" + found + " was not accepted? (no messageId!! found=" + found
+ " msgId=" + state.getMessageId() + ")"); + " msgId=" + state.getMessageId()
+ ", sendingSync=" + (afterSendingSync-beforeSendingSync)
+ ", sendMessage=" + (beforeWaitFor-afterSendingSync)
+ ", waitFor=" + (afterWaitFor-beforeWaitFor)
+ ", removingSync=" + (afterRemovingSync-afterWaitFor)
+ ")");
//if (true) //if (true)
// throw new OutOfMemoryError("not really an OOM, but more of jr fucking shit up"); // throw new OutOfMemoryError("not really an OOM, but more of jr fucking shit up");
nackTags(state); nackTags(state);
@ -280,7 +307,10 @@ class I2PSessionImpl2 extends I2PSessionImpl {
public void receiveStatus(int msgId, long nonce, int status) { public void receiveStatus(int msgId, long nonce, int status) {
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce);
MessageState state = null; MessageState state = null;
long beforeSync = _context.clock().now();
long inSync = 0;
synchronized (_sendingStates) { synchronized (_sendingStates) {
inSync = _context.clock().now();
for (Iterator iter = _sendingStates.iterator(); iter.hasNext();) { for (Iterator iter = _sendingStates.iterator(); iter.hasNext();) {
state = (MessageState) iter.next(); state = (MessageState) iter.next();
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State " + state.getMessageId() + " / " + state.getNonce()); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State " + state.getMessageId() + " / " + state.getNonce());
@ -296,6 +326,11 @@ class I2PSessionImpl2 extends I2PSessionImpl {
} }
} }
} }
long afterSync = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("receiveStatus(" + msgId + ", " + nonce + ", " + status+ "): sync: "
+ (inSync-beforeSync) + "ms, check: " + (afterSync-inSync));
if (state != null) { if (state != null) {
if (state.getMessageId() == null) { if (state.getMessageId() == null) {

View File

@ -363,8 +363,9 @@ public class ClientConnectionRunner {
*/ */
private class ClientWriterRunner implements Runnable { private class ClientWriterRunner implements Runnable {
private List _messagesToWrite; private List _messagesToWrite;
private long _lastAdded;
public ClientWriterRunner() { public ClientWriterRunner() {
_messagesToWrite = new ArrayList(2); _messagesToWrite = new ArrayList(4);
} }
/** /**
@ -374,7 +375,8 @@ public class ClientConnectionRunner {
public void addMessage(I2CPMessage msg) { public void addMessage(I2CPMessage msg) {
synchronized (_messagesToWrite) { synchronized (_messagesToWrite) {
_messagesToWrite.add(msg); _messagesToWrite.add(msg);
_messagesToWrite.notify(); _lastAdded = _context.clock().now();
_messagesToWrite.notifyAll();
} }
} }
@ -384,28 +386,40 @@ public class ClientConnectionRunner {
*/ */
public void stopWriting() { public void stopWriting() {
synchronized (_messagesToWrite) { synchronized (_messagesToWrite) {
_messagesToWrite.notify(); _messagesToWrite.notifyAll();
} }
} }
public void run() { public void run() {
while (!_dead) { while (!_dead) {
I2CPMessage msg = null; I2CPMessage msg = null;
long beforeCheckSync = _context.clock().now();
long inCheckSync = 0;
synchronized (_messagesToWrite) { synchronized (_messagesToWrite) {
inCheckSync = _context.clock().now();
if (_messagesToWrite.size() > 0) { if (_messagesToWrite.size() > 0) {
// we do this test before and after wait, in case more than
// one message gets enqueued
msg = (I2CPMessage)_messagesToWrite.remove(0); msg = (I2CPMessage)_messagesToWrite.remove(0);
} else { } else {
try { try {
_messagesToWrite.wait(); _messagesToWrite.wait();
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {
if (_messagesToWrite.size() > 0) if (_messagesToWrite.size() > 0)
msg = (I2CPMessage)_messagesToWrite.remove(0); msg = (I2CPMessage)_messagesToWrite.remove(0);
} }
} }
if (msg != null) }
long afterCheckSync = _context.clock().now();
if (msg != null) {
writeMessage(msg); writeMessage(msg);
long afterWriteMessage = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("writeMessage: check sync took "
+ (inCheckSync-beforeCheckSync) + "ms, writemessage took "
+ (afterWriteMessage-afterCheckSync)
+ "ms, time since addMessage(): " +
+ (afterCheckSync-_lastAdded));
}
} }
} }
@ -417,7 +431,8 @@ public class ClientConnectionRunner {
_out.flush(); _out.flush();
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("after doSend of a "+ msg.getClass().getName() + " message"); _log.debug("after writeMessage("+ msg.getClass().getName() + "): "
+ (_context.clock().now()-before) + "ms");;
} catch (I2CPMessageException ime) { } catch (I2CPMessageException ime) {
_log.error("Message exception sending I2CP message", ime); _log.error("Message exception sending I2CP message", ime);
stopRunning(); stopRunning();
@ -456,7 +471,15 @@ public class ClientConnectionRunner {
+ _config.getDestination().calculateHash().toBase64()); + _config.getDestination().calculateHash().toBase64());
} }
_writer.addMessage(msg); _writer.addMessage(msg);
if (_log.shouldLog(Log.DEBUG)) {
if ( (_config == null) || (_config.getDestination() == null) )
_log.debug("after doSend of a "+ msg.getClass().getName()
+ " message on for establishing i2cp con");
else
_log.debug("after doSend of a "+ msg.getClass().getName()
+ " message on for "
+ _config.getDestination().calculateHash().toBase64());
}
} }
// this *should* be mod 65536, but UnsignedInteger is still b0rked. FIXME // this *should* be mod 65536, but UnsignedInteger is still b0rked. FIXME