propagate from branch 'i2p.i2p' (head 1de143fff53bb56e6eac926d6293d62200f0c392)

to branch 'i2p.i2p.zzz.multisess' (head 70fc07857232668b93ca6ba02c433dffc7639132)
This commit is contained in:
zzz
2015-06-08 21:50:42 +00:00
51 changed files with 2036 additions and 263 deletions

View File

@ -874,6 +874,9 @@ class Connection {
*/
public void setOptions(ConnectionOptions opts) { _options = opts; }
/** @since 0.9.21 */
public ConnectionManager getConnectionManager() { return _connectionManager; }
public I2PSession getSession() { return _connectionManager.getSession(); }
public I2PSocketFull getSocket() { return _socket; }
public void setSocket(I2PSocketFull socket) { _socket = socket; }

View File

@ -88,7 +88,7 @@ class ConnectionManager {
// As of 0.9.1, listen on configured port (default 0 = all)
int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY;
_session.addMuxedSessionListener(_messageHandler, protocol, defaultOptions.getLocalPort());
_outboundQueue = new PacketQueue(_context, _session, this);
_outboundQueue = new PacketQueue(_context, _session);
_recentlyClosed = new LHMCache<Long, Object>(32);
/** Socket timeout for accept() */
_soTimeout = -1;
@ -429,7 +429,8 @@ class ConnectionManager {
// try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {}
} else {
con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, false);
con = new Connection(_context, this, _schedulerChooser, _timer,
_outboundQueue, _conPacketHandler, opts, false);
con.setRemotePeer(peer);
assignReceiveStreamId(con);
break; // stop looping as a psuedo-wait
@ -890,4 +891,12 @@ class ConnectionManager {
if (req != null)
req.pong(payload);
}
/**
* @since 0.9.20
*/
@Override
public String toString() {
return "ConnectionManager for " + _session;
}
}

View File

@ -1,26 +1,40 @@
package net.i2p.client.streaming.impl;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PClient;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.crypto.SigType;
import net.i2p.data.Certificate;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.PrivateKey;
import net.i2p.data.PublicKey;
import net.i2p.data.SimpleDataStructure;
import net.i2p.util.ConvertToHash;
import net.i2p.util.Log;
/**
@ -37,6 +51,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private final I2PAppContext _context;
private final Log _log;
private final I2PSession _session;
private final ConcurrentHashMap<I2PSession, ConnectionManager> _subsessions;
private final I2PServerSocketFull _serverSocket;
private StandardServerSocket _realServerSocket;
private final ConnectionOptions _defaultOptions;
@ -45,7 +60,52 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private static final AtomicInteger __managerId = new AtomicInteger();
private final ConnectionManager _connectionManager;
private final AtomicBoolean _isDestroyed = new AtomicBoolean();
/** @since 0.9.20 */
private static final Set<Hash> _dsaOnly = new HashSet<Hash>(16);
private static final String[] DSA_ONLY_HASHES = {
// list from http://zzz.i2p/topics/1682?page=1#p8414
// bzr.welterde.i2p
"Cvs1gCZTTkgD2Z2byh2J9atPmh5~I8~L7BNQnQl0hUE=",
// docs.i2p2.i2p
"WCXV87RdrF6j-mnn6qt7kVSBifHTlPL0PmVMFWwaolo=",
// flibusta.i2p
"yy2hYtqqfl84N9skwdRkeM7baFMXHKyDWU3XRShlEo8=",
// forum.i2p
"3t5Ar2NCTIOId70uzX2bZyJljR0aBogxMEzNyHirB7A=",
// i2jump.i2p
"9vaoGZbOaeqdRK2qEunlwRM9mUSW-I9R4OON35TDKK4=",
// irc.welterde.i2p
"5rjezx4McFk3bNhoJV-NTLlQW1AR~jiUcN6DOWMCCVc=",
// lists.i2p2.i2p
"qwtgoFoMSK0TOtbT4ovBX1jHUzCoZCPzrJVxjKD7RCg=",
// mtn.i2p2.i2p
"X5VDzYaoX9-P6bAWnrVSR5seGLkOeORP2l3Mh4drXPo=",
// nntp.welterde.i2p
"VXwmNIwMy1BcUVmut0oZ72jbWoqFzvxJukmS-G8kAAE=",
// paste.i2p2.i2p
"DoyMyUUgOSTddvRpqYfKHFPPjkkX~iQmResyfjjBYWs=",
// syndie.wetlerde.i2p
"xMxC54BFgyp-~zzrQI3F8m2CK--9XMcNmSAep6RH4Kk=",
// ugha.i2p
"zsu3WF~QLBxZXH-gHq9MuZE6y8ROZmMF7dA2MbMMKkY=",
// tracker.welterde.i2p
"EVkFgKkrDKyGfI7TIuDmlHoAmvHC~FbnY946DfujR0A=",
// www.i2p2.i2p
"im9gytzKT15mT1sB5LC9bHXCcwytQ4EPcrGQhoam-4w="
};
static {
for (int i = 0; i < DSA_ONLY_HASHES.length; i++) {
String s = DSA_ONLY_HASHES[i];
Hash h = ConvertToHash.getHash(s);
if (h != null)
_dsaOnly.add(h);
else
System.out.println("Bad hash " + s);
}
}
/**
* How long to wait for the client app to accept() before sending back CLOSE?
* This includes the time waiting in the queue. Currently set to 5 seconds.
@ -80,6 +140,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
_context = context;
_session = session;
_subsessions = new ConcurrentHashMap<I2PSession, ConnectionManager>(4);
_log = _context.logManager().getLog(I2PSocketManagerFull.class);
_name = name + " " + (__managerId.incrementAndGet());
@ -120,6 +181,100 @@ public class I2PSocketManagerFull implements I2PSocketManager {
return _session;
}
/**
* @return a new subsession, non-null
* @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session
* and different signing keys
* @param opts subsession options if any, may be null
* @since 0.9.19
*/
public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException {
if (privateKeyStream == null) {
// We don't actually need the same pubkey in the dest, just in the LS.
// The dest one is unused. But this is how we find the LS keys
// to reuse in RequestLeaseSetMessageHandler.
ByteArrayOutputStream keyStream = new ByteArrayOutputStream(1024);
try {
SigType type = getSigType(opts);
if (type != SigType.DSA_SHA1) {
// hassle, have to set up the padding and cert, see I2PClientImpl
throw new I2PSessionException("type " + type + " unsupported");
}
PublicKey pub = _session.getMyDestination().getPublicKey();
PrivateKey priv = _session.getDecryptionKey();
SimpleDataStructure[] keys = _context.keyGenerator().generateSigningKeys(type);
pub.writeBytes(keyStream);
keys[0].writeBytes(keyStream); // signing pub
Certificate.NULL_CERT.writeBytes(keyStream);
priv.writeBytes(keyStream);
keys[1].writeBytes(keyStream); // signing priv
} catch (Exception e) {
throw new I2PSessionException("Error creating keys", e);
}
privateKeyStream = new ByteArrayInputStream(keyStream.toByteArray());
}
I2PSession rv = _session.addSubsession(privateKeyStream, opts);
ConnectionOptions defaultOptions = new ConnectionOptions(opts);
ConnectionManager connectionManager = new ConnectionManager(_context, rv, defaultOptions);
ConnectionManager old = _subsessions.putIfAbsent(rv, connectionManager);
if (old != null) {
// shouldn't happen
_session.removeSubsession(rv);
connectionManager.shutdown();
throw new I2PSessionException("dup");
}
if (_log.shouldLog(Log.WARN))
_log.warn("Added subsession " + rv);
return rv;
}
/**
* @param opts may be null
* @since 0.9.20 copied from I2PSocketManagerFactory
*/
private SigType getSigType(Properties opts) {
if (opts != null) {
String st = opts.getProperty(I2PClient.PROP_SIGTYPE);
if (st != null) {
SigType rv = SigType.parseSigType(st);
if (rv != null && rv.isAvailable())
return rv;
if (rv != null)
st = rv.toString();
_log.logAlways(Log.WARN, "Unsupported sig type " + st +
", reverting to " + I2PClient.DEFAULT_SIGTYPE);
// TODO throw instead?
}
}
return I2PClient.DEFAULT_SIGTYPE;
}
/**
* Remove the subsession
*
* @since 0.9.19
*/
public void removeSubsession(I2PSession session) {
_session.removeSubsession(session);
ConnectionManager cm = _subsessions.remove(session);
if (cm != null) {
cm.shutdown();
if (_log.shouldLog(Log.WARN))
_log.warn("Removeed subsession " + session);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Subsession not found to remove " + session);
}
}
/**
* @return a list of subsessions, non-null, does not include the primary session
* @since 0.9.19
*/
public List<I2PSession> getSubsessions() {
return _session.getSubsessions();
}
public ConnectionManager getConnectionManager() {
return _connectionManager;
}
@ -262,11 +417,16 @@ public class I2PSocketManagerFull implements I2PSocketManager {
}
private void verifySession() throws I2PException {
verifySession(_connectionManager);
}
/** @since 0.9.20 */
private void verifySession(ConnectionManager cm) throws I2PException {
if (_isDestroyed.get())
throw new I2PException("Session was closed");
if (!_connectionManager.getSession().isClosed())
if (!cm.getSession().isClosed())
return;
_connectionManager.getSession().connect();
cm.getSession().connect();
}
/**
@ -285,7 +445,6 @@ public class I2PSocketManagerFull implements I2PSocketManager {
*/
public I2PSocket connect(Destination peer, I2PSocketOptions options)
throws I2PException, NoRouteToHostException {
verifySession();
if (options == null)
options = _defaultOptions;
ConnectionOptions opts = null;
@ -297,8 +456,23 @@ public class I2PSocketManagerFull implements I2PSocketManager {
if (_log.shouldLog(Log.INFO))
_log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
+ " with options: " + opts);
// pick the subsession here
ConnectionManager cm = _connectionManager;
if (!_subsessions.isEmpty()) {
Hash h = peer.calculateHash();
if (_dsaOnly.contains(h)) {
// FIXME just taking the first one for now
for (Map.Entry<I2PSession, ConnectionManager> e : _subsessions.entrySet()) {
if (e.getKey().getMyDestination().getSigType() == SigType.DSA_SHA1) {
cm = e.getValue();
break;
}
}
}
}
verifySession(cm);
// the following blocks unless connect delay > 0
Connection con = _connectionManager.connect(peer, opts);
Connection con = cm.connect(peer, opts);
if (con == null)
throw new TooManyStreamsException("Too many streams, max " + _defaultOptions.getMaxConns());
I2PSocketFull socket = new I2PSocketFull(con,_context);
@ -381,6 +555,12 @@ public class I2PSocketManagerFull implements I2PSocketManager {
}
_connectionManager.setAllowIncomingConnections(false);
_connectionManager.shutdown();
if (!_subsessions.isEmpty()) {
for (I2PSession sess : _subsessions.keySet()) {
removeSubsession(sess);
}
}
// should we destroy the _session too?
// yes, since the old lib did (and SAM wants it to, and i dont know why not)
if ( (_session != null) && (!_session.isClosed()) ) {

View File

@ -50,7 +50,7 @@ class MessageHandler implements I2PSessionMuxedListener {
* @param size size of the message
*/
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) {
byte data[] = null;
byte data[];
try {
data = session.receiveMessage(msgId);
} catch (I2PSessionException ise) {
@ -59,7 +59,17 @@ class MessageHandler implements I2PSessionMuxedListener {
_log.warn("Error receiving the message", ise);
return;
}
if (data == null) return;
if (data == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received null data on " + session + " proto: " + proto +
" fromPort: " + fromPort + " toPort: " + toPort);
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received " + data.length + " bytes on " + session +
" (" + _manager + ')' +
" proto: " + proto +
" fromPort: " + fromPort + " toPort: " + toPort);
Packet packet = new Packet();
try {
packet.readPacket(data, 0, data.length);

View File

@ -568,6 +568,27 @@ class MessageInputStream extends InputStream {
@Override
public void close() {
synchronized (_dataLock) {
if (_log.shouldLog(Log.DEBUG)) {
StringBuilder buf = new StringBuilder(128);
buf.append("close(), ready bytes: ");
long available = 0;
for (int i = 0; i < _readyDataBlocks.size(); i++)
available += _readyDataBlocks.get(i).getValid();
available -= _readyDataBlockIndex;
buf.append(available);
buf.append(" blocks: ").append(_readyDataBlocks.size());
buf.append(" not ready blocks: ");
long notAvailable = 0;
for (Long id : _notYetReadyBlocks.keySet()) {
ByteArray ba = _notYetReadyBlocks.get(id);
buf.append(id).append(" ");
if (ba != null)
notAvailable += ba.getValid();
}
buf.append("not ready bytes: ").append(notAvailable);
buf.append(" highest ready block: ").append(_highestReadyBlockId);
_log.debug(buf.toString());
}
//while (_readyDataBlocks.size() > 0)
// _cache.release((ByteArray)_readyDataBlocks.remove(0));
_readyDataBlocks.clear();

View File

@ -766,7 +766,7 @@ class Packet {
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS ").append(_optionMaxSize);
if (isFlagSet(FLAG_PROFILE_INTERACTIVE)) buf.append(" INTERACTIVE");
if (isFlagSet(FLAG_RESET)) buf.append(" RESET");
if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG");
if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG ").append(_optionSignature.length());
if (isFlagSet(FLAG_SIGNATURE_REQUESTED)) buf.append(" SIGREQ");
if (isFlagSet(FLAG_SYNCHRONIZE)) buf.append(" SYN");
}

View File

@ -29,7 +29,6 @@ class PacketQueue implements SendMessageStatusListener {
private final I2PAppContext _context;
private final Log _log;
private final I2PSession _session;
private final ConnectionManager _connectionManager;
private final ByteCache _cache = ByteCache.getInstance(64, 36*1024);
private final Map<Long, Connection> _messageStatusMap;
private volatile boolean _dead;
@ -46,10 +45,9 @@ class PacketQueue implements SendMessageStatusListener {
private static final long REMOVE_EXPIRED_TIME = 67*1000;
private static final boolean ENABLE_STATUS_LISTEN = true;
public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
public PacketQueue(I2PAppContext context, I2PSession session) {
_context = context;
_session = session;
_connectionManager = mgr;
_log = context.logManager().getLog(PacketQueue.class);
_messageStatusMap = new ConcurrentHashMap<Long, Connection>(16);
new RemoveExpired();
@ -199,9 +197,10 @@ class PacketQueue implements SendMessageStatusListener {
//packet.setTagsSent(tagsSent);
packet.incrementSends();
Connection c = packet.getConnection();
String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO() : null);
if (_log.shouldDebug())
_connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix);
if (c != null) {
String suffix = "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO();
c.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix);
}
if (I2PSocketManagerFull.pcapWriter != null &&
_context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP))
packet.logTCPDump();