- Move BWLimits and DestLookup message support from
        I2PSimpleSession to I2PSessionImpl
      - Include the Hash in the DestReplyMessage on a failed lookup
        so the client may correlate replies
      - Add support for parallel lookups and BWLimits requests
      - Add support for specifying the timeout for DestLookups
        (can only be smaller than the router timeout for now)
      - Extend dest lookup router timeout from 10s to 15s
This commit is contained in:
zzz
2010-12-26 20:36:44 +00:00
parent 49b11bb49e
commit 443abce647
8 changed files with 211 additions and 78 deletions

View File

@ -14,11 +14,13 @@ import java.util.StringTokenizer;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketEepGet;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketManagerFactory;
import net.i2p.data.Base32;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
@ -316,21 +318,44 @@ public class I2PSnarkUtil {
}
}
private static final int BASE32_HASH_LENGTH = 52; // 1 + Hash.HASH_LENGTH * 8 / 5
/** Base64 Hash or Hash.i2p or name.i2p using naming service */
Destination getDestination(String ip) {
if (ip == null) return null;
if (ip.endsWith(".i2p")) {
if (ip.length() < 520) { // key + ".i2p"
Destination dest = _context.namingService().lookup(ip);
if (dest != null)
return dest;
if (_manager != null && ip.length() == BASE32_HASH_LENGTH + 8 && ip.endsWith(".b32.i2p")) {
// Use existing I2PSession for b32 lookups if we have it
// This is much more efficient than using the naming service
I2PSession sess = _manager.getSession();
if (sess != null) {
byte[] b = Base32.decode(ip.substring(0, BASE32_HASH_LENGTH));
if (b != null) {
Hash h = new Hash(b);
if (_log.shouldLog(Log.INFO))
_log.info("Using existing session for lookup of " + ip);
try {
return sess.lookupDest(h);
} catch (I2PSessionException ise) {
}
}
}
}
if (_log.shouldLog(Log.INFO))
_log.info("Using naming service for lookup of " + ip);
return _context.namingService().lookup(ip);
}
if (_log.shouldLog(Log.INFO))
_log.info("Creating Destination for " + ip);
try {
return new Destination(ip.substring(0, ip.length()-4)); // sans .i2p
} catch (DataFormatException dfe) {
return null;
}
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Creating Destination for " + ip);
try {
return new Destination(ip);
} catch (DataFormatException dfe) {

View File

@ -10,6 +10,9 @@ import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.DestReplyMessage;
import net.i2p.util.Log;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
/**
* Handle I2CP dest replies from the router
*/
@ -22,6 +25,12 @@ class DestReplyMessageHandler extends HandlerImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle message " + message);
DestReplyMessage msg = (DestReplyMessage) message;
((I2PSimpleSession)session).destReceived(msg.getDestination());
Destination d = msg.getDestination();
if (d != null)
session.destReceived(d);
Hash h = msg.getHash();
if (h != null)
session.destLookupFailed(h);
// else let it time out
}
}

View File

@ -10,6 +10,8 @@ package net.i2p.client;
*/
import net.i2p.I2PAppContext;
import net.i2p.data.i2cp.BandwidthLimitsMessage;
import net.i2p.data.i2cp.DestReplyMessage;
import net.i2p.data.i2cp.DisconnectMessage;
import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.data.i2cp.MessageStatusMessage;
@ -36,6 +38,8 @@ class I2PClientMessageHandlerMap {
highest = Math.max(highest, MessagePayloadMessage.MESSAGE_TYPE);
highest = Math.max(highest, MessageStatusMessage.MESSAGE_TYPE);
highest = Math.max(highest, SetDateMessage.MESSAGE_TYPE);
highest = Math.max(highest, DestReplyMessage.MESSAGE_TYPE);
highest = Math.max(highest, BandwidthLimitsMessage.MESSAGE_TYPE);
_handlers = new I2CPMessageHandler[highest+1];
_handlers[DisconnectMessage.MESSAGE_TYPE] = new DisconnectMessageHandler(context);
@ -44,6 +48,8 @@ class I2PClientMessageHandlerMap {
_handlers[MessagePayloadMessage.MESSAGE_TYPE] = new MessagePayloadMessageHandler(context);
_handlers[MessageStatusMessage.MESSAGE_TYPE] = new MessageStatusMessageHandler(context);
_handlers[SetDateMessage.MESSAGE_TYPE] = new SetDateMessageHandler(context);
_handlers[DestReplyMessage.MESSAGE_TYPE] = new DestReplyMessageHandler(context);
_handlers[BandwidthLimitsMessage.MESSAGE_TYPE] = new BWLimitsMessageHandler(context);
}
public I2CPMessageHandler getHandler(int messageTypeId) {

View File

@ -138,13 +138,21 @@ public interface I2PSession {
public SigningPrivateKey getPrivateKey();
/**
* Lookup up a Hash
*
* Lookup a Destination by Hash.
* Blocking. Waits a max of 10 seconds by default.
*/
public Destination lookupDest(Hash h) throws I2PSessionException;
/**
* Get the current bandwidth limits
* Blocking.
* @param maxWait ms
* @since 0.8.3
* @return null on failure
*/
public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException;
/**
* Get the current bandwidth limits. Blocking.
*/
public int[] bandwidthLimits() throws I2PSessionException;

View File

@ -15,7 +15,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@ -23,6 +22,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.data.DataFormatException;
@ -33,6 +34,8 @@ import net.i2p.data.PrivateKey;
import net.i2p.data.SessionKey;
import net.i2p.data.SessionTag;
import net.i2p.data.SigningPrivateKey;
import net.i2p.data.i2cp.DestLookupMessage;
import net.i2p.data.i2cp.GetBandwidthLimitsMessage;
import net.i2p.data.i2cp.GetDateMessage;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageException;
@ -95,6 +98,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
protected I2CPMessageProducer _producer;
/** map of Long --> MessagePayloadMessage */
protected Map<Long, MessagePayloadMessage> _availableMessages;
/** hashes of lookups we are waiting for */
protected final LinkedBlockingQueue<LookupWaiter> _pendingLookups = new LinkedBlockingQueue();
protected final Object _bwReceivedLock = new Object();
protected int[] _bwLimits;
protected I2PClientMessageHandlerMap _handlerMap;
@ -786,12 +794,104 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
return buf.toString();
}
public Destination lookupDest(Hash h) throws I2PSessionException {
return null;
/** called by the message handler */
void destReceived(Destination d) {
Hash h = d.calculateHash();
for (LookupWaiter w : _pendingLookups) {
if (w.hash.equals(h)) {
w.destination = d;
synchronized (w) {
w.notifyAll();
}
}
}
}
/** called by the message handler */
void destLookupFailed(Hash h) {
for (LookupWaiter w : _pendingLookups) {
if (w.hash.equals(h)) {
synchronized (w) {
w.notifyAll();
}
}
}
}
/** called by the message handler */
void bwReceived(int[] i) {
_bwLimits = i;
synchronized (_bwReceivedLock) {
_bwReceivedLock.notifyAll();
}
}
/**
* Simple object to wait for lookup replies
* @since 0.8.3
*/
private static class LookupWaiter {
/** the request */
public final Hash hash;
/** the reply */
public Destination destination;
public LookupWaiter(Hash h) {
this.hash = h;
}
}
/**
* Blocking. Waits a max of 10 seconds by default.
* See lookupDest with maxWait parameter to change.
* Implemented in 0.8.3 in I2PSessionImpl;
* previously was available only in I2PSimpleSession.
* Multiple outstanding lookups are now allowed.
* @return null on failure
*/
public Destination lookupDest(Hash h) throws I2PSessionException {
return lookupDest(h, 10*1000);
}
/**
* Blocking.
* @param maxWait ms
* @since 0.8.3
* @return null on failure
*/
public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException {
if (_closed)
return null;
LookupWaiter waiter = new LookupWaiter(h);
_pendingLookups.offer(waiter);
sendMessage(new DestLookupMessage(h));
try {
synchronized (waiter) {
waiter.wait(maxWait);
}
} catch (InterruptedException ie) {}
_pendingLookups.remove(waiter);
return waiter.destination;
}
/**
* Blocking. Waits a max of 5 seconds.
* But shouldn't take long.
* Implemented in 0.8.3 in I2PSessionImpl;
* previously was available only in I2PSimpleSession.
* Multiple outstanding lookups are now allowed.
* @return null on failure
*/
public int[] bandwidthLimits() throws I2PSessionException {
return null;
if (_closed)
return null;
sendMessage(new GetBandwidthLimitsMessage());
try {
synchronized (_bwReceivedLock) {
_bwReceivedLock.wait(5*1000);
}
} catch (InterruptedException ie) {}
return _bwLimits;
}
protected void updateActivity() {

View File

@ -33,12 +33,6 @@ import net.i2p.util.I2PAppThread;
* @author zzz
*/
class I2PSimpleSession extends I2PSessionImpl2 {
private boolean _destReceived;
private /* FIXME final FIXME */ Object _destReceivedLock;
private Destination _destination;
private boolean _bwReceived;
private /* FIXME final FIXME */ Object _bwReceivedLock;
private int[] _bwLimits;
/**
* Create a new session for doing naming and bandwidth queries only. Do not create a destination.
@ -104,57 +98,6 @@ class I2PSimpleSession extends I2PSessionImpl2 {
}
}
/** called by the message handler */
void destReceived(Destination d) {
_destReceived = true;
_destination = d;
synchronized (_destReceivedLock) {
_destReceivedLock.notifyAll();
}
}
void bwReceived(int[] i) {
_bwReceived = true;
_bwLimits = i;
synchronized (_bwReceivedLock) {
_bwReceivedLock.notifyAll();
}
}
@Override
public Destination lookupDest(Hash h) throws I2PSessionException {
if (_closed)
return null;
_destReceivedLock = new Object();
sendMessage(new DestLookupMessage(h));
for (int i = 0; i < 10 && !_destReceived; i++) {
try {
synchronized (_destReceivedLock) {
_destReceivedLock.wait(1000);
}
} catch (InterruptedException ie) {}
}
_destReceived = false;
return _destination;
}
@Override
public int[] bandwidthLimits() throws I2PSessionException {
if (_closed)
return null;
_bwReceivedLock = new Object();
sendMessage(new GetBandwidthLimitsMessage());
for (int i = 0; i < 5 && !_bwReceived; i++) {
try {
synchronized (_bwReceivedLock) {
_bwReceivedLock.wait(1000);
}
} catch (InterruptedException ie) {}
}
_bwReceived = false;
return _bwLimits;
}
/**
* Only map message handlers that we will use
*/

View File

@ -22,6 +22,13 @@ import net.i2p.data.Hash;
*
* All calls are blocking and return null on failure.
* Timeout is set to 10 seconds in I2PSimpleSession.
*
* As of 0.8.3, standard I2PSessions support lookups,
* including multiple lookups in parallel, and overriding
* the default timeout.
* Using an existing I2PSession is much more efficient and
* flexible than using this class.
*
*/
class LookupDest {

View File

@ -13,14 +13,18 @@ import java.io.InputStream;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
/**
* Response to DestLookupMessage
*
* Response to DestLookupMessage.
* As of 0.8.3, the response may include the hash from the request, indicating
* a failure for a specific request.
* Payload may be empty (failure), a Hash (failure), or a Destination.
*/
public class DestReplyMessage extends I2CPMessageImpl {
public final static int MESSAGE_TYPE = 35;
private Destination _dest;
private Hash _hash;
public DestReplyMessage() {
super();
@ -30,23 +34,52 @@ public class DestReplyMessage extends I2CPMessageImpl {
_dest = d;
}
/**
* @param h non-null with non-null data
* @since 0.8.3
*/
public DestReplyMessage(Hash h) {
_hash = h;
}
public Destination getDestination() {
return _dest;
}
/**
* @since 0.8.3
*/
public Hash getHash() {
return _hash;
}
protected void doReadMessage(InputStream in, int size) throws I2CPMessageException, IOException {
try {
Destination d = new Destination();
d.readBytes(in);
_dest = d;
} catch (DataFormatException dfe) {
_dest = null; // null dest allowed
if (size == 0) {
_dest = null;
_hash = null;
} else {
try {
if (size == Hash.HASH_LENGTH) {
Hash h = new Hash();
h.readBytes(in);
_hash = h;
} else {
Destination d = new Destination();
d.readBytes(in);
_dest = d;
}
} catch (DataFormatException dfe) {
_dest = null;
_hash = null;
}
}
}
protected byte[] doWriteMessage() throws I2CPMessageException, IOException {
if (_dest == null)
if (_dest == null && _hash == null)
return new byte[0]; // null response allowed
if (_dest == null && _hash != null)
return _hash.getData();
ByteArrayOutputStream os = new ByteArrayOutputStream(_dest.size());
try {
_dest.writeBytes(os);
@ -65,7 +98,8 @@ public class DestReplyMessage extends I2CPMessageImpl {
public boolean equals(Object object) {
if ((object != null) && (object instanceof DestReplyMessage)) {
DestReplyMessage msg = (DestReplyMessage) object;
return DataHelper.eq(getDestination(), msg.getDestination());
return DataHelper.eq(getDestination(), msg.getDestination()) &&
DataHelper.eq(getHash(), msg.getHash());
}
return false;
}
@ -75,6 +109,7 @@ public class DestReplyMessage extends I2CPMessageImpl {
StringBuilder buf = new StringBuilder();
buf.append("[DestReplyMessage: ");
buf.append("\n\tDestination: ").append(_dest);
buf.append("\n\tHash: ").append(_hash);
buf.append("]");
return buf.toString();
}