2004-10-23 jrandom

* Minor ministreaming lib refactoring to simplify integration of the full
      streaming lib.
    * Minor bugfixes to data structure serialization.
This commit is contained in:
jrandom
2004-10-24 01:42:34 +00:00
committed by zzz
parent 2b9e16c9c9
commit 813679ba25
9 changed files with 809 additions and 684 deletions

View File

@ -24,7 +24,7 @@ class I2PSocketImpl implements I2PSocket {
public static final int MAX_PACKET_SIZE = 1024 * 32;
public static final int PACKET_DELAY = 100;
private I2PSocketManager manager;
private I2PSocketManagerImpl manager;
private Destination local;
private Destination remote;
private String localID;
@ -69,7 +69,7 @@ class I2PSocketImpl implements I2PSocket {
* @param outgoing did we initiate the connection (true) or did we receive it (false)?
* @param localID what is our half of the socket ID?
*/
public I2PSocketImpl(Destination peer, I2PSocketManager mgr, boolean outgoing, String localID) {
public I2PSocketImpl(Destination peer, I2PSocketManagerImpl mgr, boolean outgoing, String localID) {
this.outgoing = outgoing;
manager = mgr;
remote = peer;
@ -157,7 +157,7 @@ class I2PSocketImpl implements I2PSocket {
if (_log.shouldLog(Log.DEBUG))
_log.debug("TIMING: RemoteID set to "
+ I2PSocketManager.getReadableForm(remoteID) + " for "
+ I2PSocketManagerImpl.getReadableForm(remoteID) + " for "
+ this.hashCode());
}
return remoteID;
@ -249,9 +249,9 @@ class I2PSocketImpl implements I2PSocket {
private byte getMask(int add) {
if (outgoing)
return (byte)(I2PSocketManager.DATA_IN + (byte)add);
return (byte)(I2PSocketManagerImpl.DATA_IN + (byte)add);
else
return (byte)(I2PSocketManager.DATA_OUT + (byte)add);
return (byte)(I2PSocketManagerImpl.DATA_OUT + (byte)add);
}
public void setOptions(I2PSocketOptions options) {
@ -614,7 +614,7 @@ class I2PSocketImpl implements I2PSocket {
_log.info(getPrefix() + ":" + Thread.currentThread().getName()
+ "Sending close packet: (we started? " + outgoing
+ ") after reading " + _bytesRead + " and writing " + _bytesWritten);
byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]);
byte[] packet = I2PSocketManagerImpl.makePacket(getMask(0x02), remoteID, new byte[0]);
boolean sent = manager.getSession().sendMessage(remote, packet);
if (!sent) {
if (_log.shouldLog(Log.WARN))
@ -645,7 +645,7 @@ class I2PSocketImpl implements I2PSocket {
_log.error(getPrefix() + "NULL REMOTEID");
return false;
}
byte[] packet = I2PSocketManager.makePacket(getMask(0x00), remoteID, data);
byte[] packet = I2PSocketManagerImpl.makePacket(getMask(0x00), remoteID, data);
boolean sent;
synchronized (flagLock) {
if (closed2) return false;

View File

@ -34,62 +34,8 @@ import net.i2p.util.Log;
* or receive any messages with its .receiveMessage
*
*/
public class I2PSocketManager implements I2PSessionListener {
private I2PAppContext _context;
private Log _log;
private I2PSession _session;
private I2PServerSocketImpl _serverSocket = null;
private Object lock = new Object(); // for locking socket lists
private HashMap _outSockets;
private HashMap _inSockets;
private I2PSocketOptions _defaultOptions;
private long _acceptTimeout;
private String _name;
private static int __managerId = 0;
public static final short ACK = 0x51;
public static final short CLOSE_OUT = 0x52;
public static final short DATA_OUT = 0x50;
public static final short SYN = 0xA1;
public static final short CLOSE_IN = 0xA2;
public static final short DATA_IN = 0xA0;
public static final short CHAFF = 0xFF;
/**
* How long to wait for the client app to accept() before sending back CLOSE?
* This includes the time waiting in the queue. Currently set to 5 seconds.
*/
private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
public I2PSocketManager() {
this("SocketManager " + (++__managerId));
}
public I2PSocketManager(String name) {
_name = name;
_session = null;
_inSockets = new HashMap(16);
_outSockets = new HashMap(16);
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(I2PSocketManager.class);
_context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.sent", "How many bytes are sent in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.received", "How many bytes are received in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.transferBalance", "How many streams send more than they receive (positive means more sent, negative means more received)?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.synNoAck", "How many times have we sent a SYN but not received an ACK?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.ackSendFailed", "How many times have we tried to send an ACK to a SYN and failed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.nackSent", "How many times have we refused a SYN with a NACK?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.nackReceived", "How many times have we received a NACK to our SYN?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public I2PSession getSession() {
return _session;
}
public void setSession(I2PSession session) {
_session = session;
if (session != null) session.setSessionListener(this);
}
public interface I2PSocketManager {
public I2PSession getSession();
/**
* How long should we wait for the client to .accept() a socket before
@ -97,343 +43,11 @@ public class I2PSocketManager implements I2PSessionListener {
*
* @param ms milliseconds to wait, maximum
*/
public void setAcceptTimeout(long ms) { _acceptTimeout = ms; }
public long getAcceptTimeout() { return _acceptTimeout; }
public void disconnected(I2PSession session) {
_log.info(getName() + ": Disconnected from the session");
destroySocketManager();
}
public void errorOccurred(I2PSession session, String message, Throwable error) {
_log.error(getName() + ": Error occurred: [" + message + "]", error);
}
public void messageAvailable(I2PSession session, int msgId, long size) {
try {
I2PSocketImpl s;
byte msg[] = session.receiveMessage(msgId);
if (msg.length == 1 && msg[0] == -1) {
_log.debug(getName() + ": Ping received");
return;
}
if (msg.length < 4) {
_log.warn(getName() + ": ==== packet too short ====");
return;
}
int type = msg[0] & 0xff;
String id = toString(new byte[] { msg[1], msg[2], msg[3]});
byte[] payload = new byte[msg.length - 4];
System.arraycopy(msg, 4, payload, 0, payload.length);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": Message read: type = [" + Integer.toHexString(type)
+ "] id = [" + getReadableForm(id)
+ "] payload length: [" + payload.length + "]");
switch (type) {
case ACK:
ackAvailable(id, payload);
return;
case CLOSE_OUT:
disconnectAvailable(id, payload);
return;
case DATA_OUT:
sendOutgoingAvailable(id, payload);
return;
case SYN:
synIncomingAvailable(id, payload, session);
return;
case CLOSE_IN:
disconnectIncoming(id, payload);
return;
case DATA_IN:
sendIncoming(id, payload);
case CHAFF:
// ignore
return;
default:
handleUnknown(type, id, payload);
return;
}
} catch (I2PException ise) {
_log.warn(getName() + ": Error processing", ise);
} catch (IllegalStateException ise) {
_log.debug(getName() + ": Error processing", ise);
}
}
/**
* We've received an ACK packet (hopefully, in response to a SYN that we
* recently sent out). Notify the associated I2PSocket that we now have
* the remote stream ID (which should get things going, since the handshake
* is complete).
*
*/
private void ackAvailable(String id, byte payload[]) {
long begin = _context.clock().now();
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _outSockets.get(id);
}
if (s == null) {
_log.warn(getName() + ": No socket responsible for ACK packet for id " + getReadableForm(id));
return;
}
long socketRetrieved = _context.clock().now();
String remoteId = null;
remoteId = s.getRemoteID(false);
if ( (payload.length == 3) && (remoteId == null) ) {
String newID = toString(payload);
long beforeSetRemId = _context.clock().now();
s.setRemoteID(newID);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(getName() + ": ackAvailable - socket retrieval took "
+ (socketRetrieved-begin) + "ms, getRemoteId took "
+ (beforeSetRemId-socketRetrieved) + "ms, setRemoteId took "
+ (_context.clock().now()-beforeSetRemId) + "ms");
}
return;
} else {
// (payload.length != 3 || getRemoteId != null)
if (_log.shouldLog(Log.WARN)) {
if (payload.length != 3)
_log.warn(getName() + ": Ack packet had " + payload.length + " bytes");
else
_log.warn(getName() + ": Remote ID already exists? " + remoteId);
}
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(getName() + ": invalid ack - socket retrieval took "
+ (socketRetrieved-begin) + "ms, overall took "
+ (_context.clock().now()-begin) + "ms");
}
return;
}
}
/**
* We received a disconnect packet, telling us to tear down the specified
* stream.
*/
private void disconnectAvailable(String id, byte payload[]) {
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _outSockets.get(id);
}
_log.debug(getName() + ": *Disconnect outgoing for socket " + s + " on id "
+ getReadableForm(id));
try {
if (s != null) {
if (payload.length > 0) {
_log.debug(getName() + ": Disconnect packet had "
+ payload.length + " bytes");
}
if (s.getRemoteID(false) == null) {
s.setRemoteID(null); // Just to wake up socket
return;
}
s.internalClose();
synchronized (lock) {
_outSockets.remove(id);
}
}
return;
} catch (Exception t) {
_log.warn(getName() + ": Ignoring error on disconnect for socket " + s, t);
}
}
/**
* We've received data on a stream we created - toss the data onto
* the socket for handling.
*
* @throws IllegalStateException if the socket isn't open or isn't known
*/
private void sendOutgoingAvailable(String id, byte payload[]) throws IllegalStateException {
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _outSockets.get(id);
}
// packet send outgoing
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": *Packet send outgoing [" + payload.length + "] for socket "
+ s + " on id " + getReadableForm(id));
if (s != null) {
s.queueData(payload);
return;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn(getName() + ": Null socket with data available");
throw new IllegalStateException("Null socket with data available");
}
}
/**
* We've received a SYN packet (a request for a new stream). If the client has
* said they want incoming sockets (by retrieving the serverSocket), the stream
* will be ACKed, but if they have not, they'll be NACKed)
*
* @throws DataFormatException if the destination in the SYN was invalid
* @throws I2PSessionException if there was an I2P error sending the ACK or NACK
*/
private void synIncomingAvailable(String id, byte payload[], I2PSession session)
throws DataFormatException, I2PSessionException {
Destination d = new Destination();
d.fromByteArray(payload);
I2PSocketImpl s = null;
boolean acceptConnections = (_serverSocket != null);
String newLocalID = null;
synchronized (lock) {
newLocalID = makeID(_inSockets);
if (acceptConnections) {
s = new I2PSocketImpl(d, this, false, newLocalID);
s.setRemoteID(id);
}
}
_log.debug(getName() + ": *Syn! for socket " + s + " on id " + getReadableForm(newLocalID)
+ " from " + d.calculateHash().toBase64().substring(0,6));
if (!acceptConnections) {
// The app did not instantiate an I2PServerSocket
byte[] packet = makePacket((byte) CLOSE_OUT, id, toBytes(newLocalID));
boolean replySentOk = false;
synchronized (_session) {
replySentOk = _session.sendMessage(d, packet);
}
if (!replySentOk) {
_log.warn(getName() + ": Error sending close to " + d.calculateHash().toBase64()
+ " in response to a new con message",
new Exception("Failed creation"));
}
_context.statManager().addRateData("streaming.nackSent", 1, 1);
return;
}
if (_serverSocket.addWaitForAccept(s, _acceptTimeout)) {
_inSockets.put(newLocalID, s);
byte[] packet = makePacket((byte) ACK, id, toBytes(newLocalID));
boolean replySentOk = false;
replySentOk = _session.sendMessage(d, packet);
if (!replySentOk) {
if (_log.shouldLog(Log.WARN))
_log.warn(getName() + ": Error sending reply to " + d.calculateHash().toBase64()
+ " in response to a new con message for socket " + s,
new Exception("Failed creation"));
s.internalClose();
_context.statManager().addRateData("streaming.ackSendFailed", 1, 1);
}
} else {
// timed out or serverSocket closed
byte[] packet = toBytes(" " + id);
packet[0] = CLOSE_OUT;
boolean nackSent = session.sendMessage(d, packet);
if (!nackSent) {
_log.warn(getName() + ": Error sending NACK for session creation for socket " + s);
}
s.internalClose();
_context.statManager().addRateData("streaming,nackSent", 1, 1);
}
return;
}
/**
* We've received a disconnect for a socket we didn't initiate, so kill
* the socket.
*
*/
private void disconnectIncoming(String id, byte payload[]) {
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _inSockets.get(id);
if (payload.length == 0 && s != null) {
_inSockets.remove(id);
}
}
_log.debug(getName() + ": *Disconnect incoming for socket " + s);
try {
if (payload.length == 0 && s != null) {
s.internalClose();
return;
} else {
if ( (payload.length > 0) && (_log.shouldLog(Log.ERROR)) )
_log.warn(getName() + ": Disconnect packet had " + payload.length + " bytes");
if (s != null)
s.internalClose();
return;
}
} catch (Exception t) {
_log.warn(getName() + ": Ignoring error on disconnect", t);
return;
}
}
/**
* We've received data on a stream we received - toss the data onto
* the socket for handling.
*
* @throws IllegalStateException if the socket isn't open or isn't known
*/
private void sendIncoming(String id, byte payload[]) throws IllegalStateException {
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _inSockets.get(id);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": *Packet send incoming [" + payload.length + "] for socket " + s);
if (s != null) {
s.queueData(payload);
return;
} else {
_log.info(getName() + ": Null socket with data available");
throw new IllegalStateException("Null socket with data available");
}
}
/**
* Unknown packet. moo.
*
*/
private void handleUnknown(int type, String id, byte payload[]) {
_log.error(getName() + ": \n\n=============== Unknown packet! " + "============"
+ "\nType: " + (int) type
+ "\nID: " + getReadableForm(id)
+ "\nBase64'ed Data: " + Base64.encode(payload)
+ "\n\n\n");
if (id != null) {
synchronized (lock) {
_inSockets.remove(id);
_outSockets.remove(id);
}
}
}
public void reportAbuse(I2PSession session, int severity) {
_log.error(getName() + ": Abuse reported [" + severity + "]");
}
public void setDefaultOptions(I2PSocketOptions options) {
_defaultOptions = options;
}
public I2PSocketOptions getDefaultOptions() {
return _defaultOptions;
}
public I2PServerSocket getServerSocket() {
if (_serverSocket == null) {
_serverSocket = new I2PServerSocketImpl(this);
}
return _serverSocket;
}
public void setAcceptTimeout(long ms);
public long getAcceptTimeout();
public void setDefaultOptions(I2PSocketOptions options);
public I2PSocketOptions getDefaultOptions();
public I2PServerSocket getServerSocket();
/**
* Create a new connected socket (block until the socket is created)
@ -448,117 +62,7 @@ public class I2PSocketManager implements I2PSessionListener {
*/
public I2PSocket connect(Destination peer, I2PSocketOptions options)
throws I2PException, ConnectException,
NoRouteToHostException, InterruptedIOException {
String localID, lcID;
I2PSocketImpl s;
synchronized (lock) {
localID = makeID(_outSockets);
lcID = getReadableForm(localID);
s = new I2PSocketImpl(peer, this, true, localID);
_outSockets.put(localID, s);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": connect(" + peer.calculateHash().toBase64().substring(0,6)
+ ", ...): localID = " + lcID);
try {
ByteArrayOutputStream pubkey = new ByteArrayOutputStream();
_session.getMyDestination().writeBytes(pubkey);
String remoteID;
byte[] packet = makePacket((byte) SYN, localID, pubkey.toByteArray());
boolean sent = false;
sent = _session.sendMessage(peer, packet);
if (!sent) {
_log.info(getName() + ": Unable to send & receive ack for SYN packet for socket "
+ s + " with localID = " + lcID);
synchronized (lock) {
_outSockets.remove(s.getLocalID());
}
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
throw new I2PException("Error sending through I2P network");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": syn sent ok to "
+ peer.calculateHash().toBase64().substring(0,6)
+ " with localID = " + lcID);
}
if (options != null)
remoteID = s.getRemoteID(true, options.getConnectTimeout());
else
remoteID = s.getRemoteID(true, getDefaultOptions().getConnectTimeout());
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": remoteID received from "
+ peer.calculateHash().toBase64().substring(0,6)
+ ": " + getReadableForm(remoteID)
+ " with localID = " + lcID);
if (remoteID == null) {
_context.statManager().addRateData("streaming.nackReceived", 1, 1);
throw new ConnectException("Connection refused by peer for socket " + s);
}
if ("".equals(remoteID)) {
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
throw new NoRouteToHostException("Unable to reach peer for socket " + s);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": TIMING: s given out for remoteID "
+ getReadableForm(remoteID) + " for socket " + s);
return s;
} catch (InterruptedIOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn(getName() + ": Timeout waiting for ack from syn for id "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, ioe);
synchronized (lock) {
_outSockets.remove(s.getLocalID());
}
s.internalClose();
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
throw new InterruptedIOException("Timeout waiting for ack");
} catch (ConnectException ex) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": Connection error waiting for ack from syn for id "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, ex);
s.internalClose();
throw ex;
} catch (NoRouteToHostException ex) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": No route to host waiting for ack from syn for id "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, ex);
s.internalClose();
throw ex;
} catch (IOException ex) {
if (_log.shouldLog(Log.WARN))
_log.warn(getName() + ": Error sending syn on id "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, ex);
synchronized (lock) {
_outSockets.remove(s.getLocalID());
}
s.internalClose();
throw new I2PException("Unhandled IOException occurred");
} catch (I2PException ex) {
if (_log.shouldLog(Log.INFO))
_log.info(getName() + ": Error sending syn on id "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, ex);
synchronized (lock) {
_outSockets.remove(s.getLocalID());
}
s.internalClose();
throw ex;
} catch (Exception e) {
s.internalClose();
_log.warn(getName() + ": Unhandled error connecting on "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, e);
throw new ConnectException("Unhandled error connecting: " + e.getMessage());
}
}
NoRouteToHostException, InterruptedIOException;
/**
* Create a new connected socket (block until the socket is created)
@ -571,182 +75,28 @@ public class I2PSocketManager implements I2PSessionListener {
* @throws I2PException if there is some other I2P-related problem
*/
public I2PSocket connect(Destination peer) throws I2PException, ConnectException,
NoRouteToHostException, InterruptedIOException {
return connect(peer, null);
}
NoRouteToHostException, InterruptedIOException;
/**
* Destroy the socket manager, freeing all the associated resources. This
* method will block untill all the managed sockets are closed.
*
*/
public void destroySocketManager() {
if (_serverSocket != null) {
_serverSocket.close();
_serverSocket = null;
}
synchronized (lock) {
Iterator iter;
String id = null;
I2PSocketImpl sock;
iter = _inSockets.keySet().iterator();
while (iter.hasNext()) {
id = (String)iter.next();
sock = (I2PSocketImpl)_inSockets.get(id);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": Closing inSocket \""
+ getReadableForm(sock.getLocalID()) + "\"");
sock.internalClose();
}
iter = _outSockets.keySet().iterator();
while (iter.hasNext()) {
id = (String)iter.next();
sock = (I2PSocketImpl)_outSockets.get(id);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": Closing outSocket \""
+ getReadableForm(sock.getLocalID()) + "\"");
sock.internalClose();
}
}
_log.debug(getName() + ": Waiting for all open sockets to really close...");
synchronized (lock) {
while ((_inSockets.size() != 0) || (_outSockets.size() != 0)) {
try {
lock.wait();
} catch (InterruptedException e) {}
}
}
try {
_log.debug(getName() + ": Destroying I2P session...");
_session.destroySession();
_log.debug(getName() + ": I2P session destroyed");
} catch (I2PSessionException e) {
_log.warn(getName() + ": Error destroying I2P session", e);
}
}
public void destroySocketManager();
/**
* Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
*
*/
public Set listSockets() {
Set sockets = new HashSet(8);
synchronized (lock) {
sockets.addAll(_inSockets.values());
sockets.addAll(_outSockets.values());
}
return sockets;
}
public Set listSockets();
/**
* Ping the specified peer, returning true if they replied to the ping within
* the timeout specified, false otherwise. This call blocks.
*
*/
public boolean ping(Destination peer, long timeoutMs) {
try {
return _session.sendMessage(peer, new byte[] { (byte) CHAFF});
} catch (I2PException ex) {
_log.warn(getName() + ": I2PException:", ex);
return false;
}
}
public boolean ping(Destination peer, long timeoutMs);
public void removeSocket(I2PSocketImpl sock) {
String localId = sock.getLocalID();
boolean removed = false;
synchronized (lock) {
removed = (null != _inSockets.remove(localId));
removed = removed || (null != _outSockets.remove(localId));
lock.notify();
}
long now = _context.clock().now();
long lifetime = now - sock.getCreatedOn();
long timeSinceClose = now - sock.getClosedOn();
long sent = sock.getBytesSent();
long recv = sock.getBytesReceived();
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(getName() + ": Removing socket \"" + getReadableForm(localId) + "\" [" + sock
+ ", send: " + sent + ", recv: " + recv
+ ", lifetime: " + lifetime + "ms, time since close: " + timeSinceClose
+ " removed? " + removed + ")]",
new Exception("removeSocket called"));
}
_context.statManager().addRateData("streaming.lifetime", lifetime, lifetime);
_context.statManager().addRateData("streaming.sent", sent, lifetime);
_context.statManager().addRateData("streaming.received", recv, lifetime);
if (sent > recv) {
_context.statManager().addRateData("streaming.transferBalance", 1, lifetime);
} else if (recv > sent) {
_context.statManager().addRateData("streaming.transferBalance", -1, lifetime);
} else {
// noop
}
}
public String getName() { return _name; }
public void setName(String name) { _name = name; }
public static String getReadableForm(String id) {
if (id == null) return "(null)";
if (id.length() != 3) return "Bogus";
return Base64.encode(toBytes(id));
}
/**
* Create a new part the connection ID that is locally unique
*
* @param uniqueIn map of already known local IDs so we don't collide. WARNING - NOT THREADSAFE!
*/
private static String makeID(HashMap uniqueIn) {
String newID;
do {
int id = (int) (Math.random() * 16777215 + 1);
byte[] nid = new byte[3];
nid[0] = (byte) (id / 65536);
nid[1] = (byte) ((id / 256) % 256);
nid[2] = (byte) (id % 256);
newID = toString(nid);
} while (uniqueIn.get(newID) != null);
return newID;
}
/**
* Create a new packet of the given type for the specified connection containing
* the given payload
*/
public static byte[] makePacket(byte type, String id, byte[] payload) {
byte[] packet = new byte[payload.length + 4];
packet[0] = type;
byte[] temp = toBytes(id);
if (temp.length != 3) throw new RuntimeException("Incorrect ID length: " + temp.length);
System.arraycopy(temp, 0, packet, 1, 3);
System.arraycopy(payload, 0, packet, 4, payload.length);
return packet;
}
private static final String toString(byte data[]) {
try {
return new String(data, "ISO-8859-1");
} catch (UnsupportedEncodingException uee) {
throw new RuntimeException("WTF! iso-8859-1 isn't supported?");
}
}
private static final byte[] toBytes(String str) {
try {
return str.getBytes("ISO-8859-1");
} catch (UnsupportedEncodingException uee) {
throw new RuntimeException("WTF! iso-8859-1 isn't supported?");
}
}
public String getName();
public void setName(String name);
}

View File

@ -90,7 +90,7 @@ public class I2PSocketManagerFactory {
}
private static I2PSocketManager createManager(I2PSession session) {
I2PSocketManager mgr = new I2PSocketManager();
I2PSocketManagerImpl mgr = new I2PSocketManagerImpl();
mgr.setSession(session);
mgr.setDefaultOptions(new I2PSocketOptions());
return mgr;

View File

@ -0,0 +1,752 @@
/*
* licensed under BSD license...
* (if you know the proper clause for that, add it ...)
*/
package net.i2p.client.streaming;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionListener;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.Log;
/**
* Centralize the coordination and multiplexing of the local client's streaming.
* There should be one I2PSocketManager for each I2PSession, and if an application
* is sending and receiving data through the streaming library using an
* I2PSocketManager, it should not attempt to call I2PSession's setSessionListener
* or receive any messages with its .receiveMessage
*
*/
public class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
private I2PAppContext _context;
private Log _log;
private I2PSession _session;
private I2PServerSocketImpl _serverSocket = null;
private Object lock = new Object(); // for locking socket lists
private HashMap _outSockets;
private HashMap _inSockets;
private I2PSocketOptions _defaultOptions;
private long _acceptTimeout;
private String _name;
private static int __managerId = 0;
public static final short ACK = 0x51;
public static final short CLOSE_OUT = 0x52;
public static final short DATA_OUT = 0x50;
public static final short SYN = 0xA1;
public static final short CLOSE_IN = 0xA2;
public static final short DATA_IN = 0xA0;
public static final short CHAFF = 0xFF;
/**
* How long to wait for the client app to accept() before sending back CLOSE?
* This includes the time waiting in the queue. Currently set to 5 seconds.
*/
private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
public I2PSocketManagerImpl() {
this("SocketManager " + (++__managerId));
}
public I2PSocketManagerImpl(String name) {
_name = name;
_session = null;
_inSockets = new HashMap(16);
_outSockets = new HashMap(16);
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(I2PSocketManager.class);
_context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.sent", "How many bytes are sent in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.received", "How many bytes are received in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.transferBalance", "How many streams send more than they receive (positive means more sent, negative means more received)?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.synNoAck", "How many times have we sent a SYN but not received an ACK?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.ackSendFailed", "How many times have we tried to send an ACK to a SYN and failed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.nackSent", "How many times have we refused a SYN with a NACK?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.nackReceived", "How many times have we received a NACK to our SYN?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public I2PSession getSession() {
return _session;
}
public void setSession(I2PSession session) {
_session = session;
if (session != null) session.setSessionListener(this);
}
/**
* How long should we wait for the client to .accept() a socket before
* sending back a NACK/Close?
*
* @param ms milliseconds to wait, maximum
*/
public void setAcceptTimeout(long ms) { _acceptTimeout = ms; }
public long getAcceptTimeout() { return _acceptTimeout; }
public void disconnected(I2PSession session) {
_log.info(getName() + ": Disconnected from the session");
destroySocketManager();
}
public void errorOccurred(I2PSession session, String message, Throwable error) {
_log.error(getName() + ": Error occurred: [" + message + "]", error);
}
public void messageAvailable(I2PSession session, int msgId, long size) {
try {
I2PSocketImpl s;
byte msg[] = session.receiveMessage(msgId);
if (msg.length == 1 && msg[0] == -1) {
_log.debug(getName() + ": Ping received");
return;
}
if (msg.length < 4) {
_log.warn(getName() + ": ==== packet too short ====");
return;
}
int type = msg[0] & 0xff;
String id = toString(new byte[] { msg[1], msg[2], msg[3]});
byte[] payload = new byte[msg.length - 4];
System.arraycopy(msg, 4, payload, 0, payload.length);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": Message read: type = [" + Integer.toHexString(type)
+ "] id = [" + getReadableForm(id)
+ "] payload length: [" + payload.length + "]");
switch (type) {
case ACK:
ackAvailable(id, payload);
return;
case CLOSE_OUT:
disconnectAvailable(id, payload);
return;
case DATA_OUT:
sendOutgoingAvailable(id, payload);
return;
case SYN:
synIncomingAvailable(id, payload, session);
return;
case CLOSE_IN:
disconnectIncoming(id, payload);
return;
case DATA_IN:
sendIncoming(id, payload);
case CHAFF:
// ignore
return;
default:
handleUnknown(type, id, payload);
return;
}
} catch (I2PException ise) {
_log.warn(getName() + ": Error processing", ise);
} catch (IllegalStateException ise) {
_log.debug(getName() + ": Error processing", ise);
}
}
/**
* We've received an ACK packet (hopefully, in response to a SYN that we
* recently sent out). Notify the associated I2PSocket that we now have
* the remote stream ID (which should get things going, since the handshake
* is complete).
*
*/
private void ackAvailable(String id, byte payload[]) {
long begin = _context.clock().now();
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _outSockets.get(id);
}
if (s == null) {
_log.warn(getName() + ": No socket responsible for ACK packet for id " + getReadableForm(id));
return;
}
long socketRetrieved = _context.clock().now();
String remoteId = null;
remoteId = s.getRemoteID(false);
if ( (payload.length == 3) && (remoteId == null) ) {
String newID = toString(payload);
long beforeSetRemId = _context.clock().now();
s.setRemoteID(newID);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(getName() + ": ackAvailable - socket retrieval took "
+ (socketRetrieved-begin) + "ms, getRemoteId took "
+ (beforeSetRemId-socketRetrieved) + "ms, setRemoteId took "
+ (_context.clock().now()-beforeSetRemId) + "ms");
}
return;
} else {
// (payload.length != 3 || getRemoteId != null)
if (_log.shouldLog(Log.WARN)) {
if (payload.length != 3)
_log.warn(getName() + ": Ack packet had " + payload.length + " bytes");
else
_log.warn(getName() + ": Remote ID already exists? " + remoteId);
}
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(getName() + ": invalid ack - socket retrieval took "
+ (socketRetrieved-begin) + "ms, overall took "
+ (_context.clock().now()-begin) + "ms");
}
return;
}
}
/**
* We received a disconnect packet, telling us to tear down the specified
* stream.
*/
private void disconnectAvailable(String id, byte payload[]) {
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _outSockets.get(id);
}
_log.debug(getName() + ": *Disconnect outgoing for socket " + s + " on id "
+ getReadableForm(id));
try {
if (s != null) {
if (payload.length > 0) {
_log.debug(getName() + ": Disconnect packet had "
+ payload.length + " bytes");
}
if (s.getRemoteID(false) == null) {
s.setRemoteID(null); // Just to wake up socket
return;
}
s.internalClose();
synchronized (lock) {
_outSockets.remove(id);
}
}
return;
} catch (Exception t) {
_log.warn(getName() + ": Ignoring error on disconnect for socket " + s, t);
}
}
/**
* We've received data on a stream we created - toss the data onto
* the socket for handling.
*
* @throws IllegalStateException if the socket isn't open or isn't known
*/
private void sendOutgoingAvailable(String id, byte payload[]) throws IllegalStateException {
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _outSockets.get(id);
}
// packet send outgoing
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": *Packet send outgoing [" + payload.length + "] for socket "
+ s + " on id " + getReadableForm(id));
if (s != null) {
s.queueData(payload);
return;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn(getName() + ": Null socket with data available");
throw new IllegalStateException("Null socket with data available");
}
}
/**
* We've received a SYN packet (a request for a new stream). If the client has
* said they want incoming sockets (by retrieving the serverSocket), the stream
* will be ACKed, but if they have not, they'll be NACKed)
*
* @throws DataFormatException if the destination in the SYN was invalid
* @throws I2PSessionException if there was an I2P error sending the ACK or NACK
*/
private void synIncomingAvailable(String id, byte payload[], I2PSession session)
throws DataFormatException, I2PSessionException {
Destination d = new Destination();
d.fromByteArray(payload);
I2PSocketImpl s = null;
boolean acceptConnections = (_serverSocket != null);
String newLocalID = null;
synchronized (lock) {
newLocalID = makeID(_inSockets);
if (acceptConnections) {
s = new I2PSocketImpl(d, this, false, newLocalID);
s.setRemoteID(id);
}
}
_log.debug(getName() + ": *Syn! for socket " + s + " on id " + getReadableForm(newLocalID)
+ " from " + d.calculateHash().toBase64().substring(0,6));
if (!acceptConnections) {
// The app did not instantiate an I2PServerSocket
byte[] packet = makePacket((byte) CLOSE_OUT, id, toBytes(newLocalID));
boolean replySentOk = false;
synchronized (_session) {
replySentOk = _session.sendMessage(d, packet);
}
if (!replySentOk) {
_log.warn(getName() + ": Error sending close to " + d.calculateHash().toBase64()
+ " in response to a new con message",
new Exception("Failed creation"));
}
_context.statManager().addRateData("streaming.nackSent", 1, 1);
return;
}
if (_serverSocket.addWaitForAccept(s, _acceptTimeout)) {
_inSockets.put(newLocalID, s);
byte[] packet = makePacket((byte) ACK, id, toBytes(newLocalID));
boolean replySentOk = false;
replySentOk = _session.sendMessage(d, packet);
if (!replySentOk) {
if (_log.shouldLog(Log.WARN))
_log.warn(getName() + ": Error sending reply to " + d.calculateHash().toBase64()
+ " in response to a new con message for socket " + s,
new Exception("Failed creation"));
s.internalClose();
_context.statManager().addRateData("streaming.ackSendFailed", 1, 1);
}
} else {
// timed out or serverSocket closed
byte[] packet = toBytes(" " + id);
packet[0] = CLOSE_OUT;
boolean nackSent = session.sendMessage(d, packet);
if (!nackSent) {
_log.warn(getName() + ": Error sending NACK for session creation for socket " + s);
}
s.internalClose();
_context.statManager().addRateData("streaming,nackSent", 1, 1);
}
return;
}
/**
* We've received a disconnect for a socket we didn't initiate, so kill
* the socket.
*
*/
private void disconnectIncoming(String id, byte payload[]) {
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _inSockets.get(id);
if (payload.length == 0 && s != null) {
_inSockets.remove(id);
}
}
_log.debug(getName() + ": *Disconnect incoming for socket " + s);
try {
if (payload.length == 0 && s != null) {
s.internalClose();
return;
} else {
if ( (payload.length > 0) && (_log.shouldLog(Log.ERROR)) )
_log.warn(getName() + ": Disconnect packet had " + payload.length + " bytes");
if (s != null)
s.internalClose();
return;
}
} catch (Exception t) {
_log.warn(getName() + ": Ignoring error on disconnect", t);
return;
}
}
/**
* We've received data on a stream we received - toss the data onto
* the socket for handling.
*
* @throws IllegalStateException if the socket isn't open or isn't known
*/
private void sendIncoming(String id, byte payload[]) throws IllegalStateException {
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _inSockets.get(id);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": *Packet send incoming [" + payload.length + "] for socket " + s);
if (s != null) {
s.queueData(payload);
return;
} else {
_log.info(getName() + ": Null socket with data available");
throw new IllegalStateException("Null socket with data available");
}
}
/**
* Unknown packet. moo.
*
*/
private void handleUnknown(int type, String id, byte payload[]) {
_log.error(getName() + ": \n\n=============== Unknown packet! " + "============"
+ "\nType: " + (int) type
+ "\nID: " + getReadableForm(id)
+ "\nBase64'ed Data: " + Base64.encode(payload)
+ "\n\n\n");
if (id != null) {
synchronized (lock) {
_inSockets.remove(id);
_outSockets.remove(id);
}
}
}
public void reportAbuse(I2PSession session, int severity) {
_log.error(getName() + ": Abuse reported [" + severity + "]");
}
public void setDefaultOptions(I2PSocketOptions options) {
_defaultOptions = options;
}
public I2PSocketOptions getDefaultOptions() {
return _defaultOptions;
}
public I2PServerSocket getServerSocket() {
if (_serverSocket == null) {
_serverSocket = new I2PServerSocketImpl(this);
}
return _serverSocket;
}
/**
* Create a new connected socket (block until the socket is created)
*
* @param peer Destination to connect to
* @param options I2P socket options to be used for connecting
*
* @throws ConnectException if the peer refuses the connection
* @throws NoRouteToHostException if the peer is not found or not reachable
* @throws InterruptedIOException if the connection timeouts
* @throws I2PException if there is some other I2P-related problem
*/
public I2PSocket connect(Destination peer, I2PSocketOptions options)
throws I2PException, ConnectException,
NoRouteToHostException, InterruptedIOException {
String localID, lcID;
I2PSocketImpl s;
synchronized (lock) {
localID = makeID(_outSockets);
lcID = getReadableForm(localID);
s = new I2PSocketImpl(peer, this, true, localID);
_outSockets.put(localID, s);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": connect(" + peer.calculateHash().toBase64().substring(0,6)
+ ", ...): localID = " + lcID);
try {
ByteArrayOutputStream pubkey = new ByteArrayOutputStream();
_session.getMyDestination().writeBytes(pubkey);
String remoteID;
byte[] packet = makePacket((byte) SYN, localID, pubkey.toByteArray());
boolean sent = false;
sent = _session.sendMessage(peer, packet);
if (!sent) {
_log.info(getName() + ": Unable to send & receive ack for SYN packet for socket "
+ s + " with localID = " + lcID);
synchronized (lock) {
_outSockets.remove(s.getLocalID());
}
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
throw new I2PException("Error sending through I2P network");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": syn sent ok to "
+ peer.calculateHash().toBase64().substring(0,6)
+ " with localID = " + lcID);
}
if (options != null)
remoteID = s.getRemoteID(true, options.getConnectTimeout());
else
remoteID = s.getRemoteID(true, getDefaultOptions().getConnectTimeout());
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": remoteID received from "
+ peer.calculateHash().toBase64().substring(0,6)
+ ": " + getReadableForm(remoteID)
+ " with localID = " + lcID);
if (remoteID == null) {
_context.statManager().addRateData("streaming.nackReceived", 1, 1);
throw new ConnectException("Connection refused by peer for socket " + s);
}
if ("".equals(remoteID)) {
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
throw new NoRouteToHostException("Unable to reach peer for socket " + s);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": TIMING: s given out for remoteID "
+ getReadableForm(remoteID) + " for socket " + s);
return s;
} catch (InterruptedIOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn(getName() + ": Timeout waiting for ack from syn for id "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, ioe);
synchronized (lock) {
_outSockets.remove(s.getLocalID());
}
s.internalClose();
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
throw new InterruptedIOException("Timeout waiting for ack");
} catch (ConnectException ex) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": Connection error waiting for ack from syn for id "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, ex);
s.internalClose();
throw ex;
} catch (NoRouteToHostException ex) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": No route to host waiting for ack from syn for id "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, ex);
s.internalClose();
throw ex;
} catch (IOException ex) {
if (_log.shouldLog(Log.WARN))
_log.warn(getName() + ": Error sending syn on id "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, ex);
synchronized (lock) {
_outSockets.remove(s.getLocalID());
}
s.internalClose();
throw new I2PException("Unhandled IOException occurred");
} catch (I2PException ex) {
if (_log.shouldLog(Log.INFO))
_log.info(getName() + ": Error sending syn on id "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, ex);
synchronized (lock) {
_outSockets.remove(s.getLocalID());
}
s.internalClose();
throw ex;
} catch (Exception e) {
s.internalClose();
_log.warn(getName() + ": Unhandled error connecting on "
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
+ " for socket " + s, e);
throw new ConnectException("Unhandled error connecting: " + e.getMessage());
}
}
/**
* Create a new connected socket (block until the socket is created)
*
* @param peer Destination to connect to
*
* @throws ConnectException if the peer refuses the connection
* @throws NoRouteToHostException if the peer is not found or not reachable
* @throws InterruptedIOException if the connection timeouts
* @throws I2PException if there is some other I2P-related problem
*/
public I2PSocket connect(Destination peer) throws I2PException, ConnectException,
NoRouteToHostException, InterruptedIOException {
return connect(peer, null);
}
/**
* Destroy the socket manager, freeing all the associated resources. This
* method will block untill all the managed sockets are closed.
*
*/
public void destroySocketManager() {
if (_serverSocket != null) {
_serverSocket.close();
_serverSocket = null;
}
synchronized (lock) {
Iterator iter;
String id = null;
I2PSocketImpl sock;
iter = _inSockets.keySet().iterator();
while (iter.hasNext()) {
id = (String)iter.next();
sock = (I2PSocketImpl)_inSockets.get(id);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": Closing inSocket \""
+ getReadableForm(sock.getLocalID()) + "\"");
sock.internalClose();
}
iter = _outSockets.keySet().iterator();
while (iter.hasNext()) {
id = (String)iter.next();
sock = (I2PSocketImpl)_outSockets.get(id);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getName() + ": Closing outSocket \""
+ getReadableForm(sock.getLocalID()) + "\"");
sock.internalClose();
}
}
_log.debug(getName() + ": Waiting for all open sockets to really close...");
synchronized (lock) {
while ((_inSockets.size() != 0) || (_outSockets.size() != 0)) {
try {
lock.wait();
} catch (InterruptedException e) {}
}
}
try {
_log.debug(getName() + ": Destroying I2P session...");
_session.destroySession();
_log.debug(getName() + ": I2P session destroyed");
} catch (I2PSessionException e) {
_log.warn(getName() + ": Error destroying I2P session", e);
}
}
/**
* Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
*
*/
public Set listSockets() {
Set sockets = new HashSet(8);
synchronized (lock) {
sockets.addAll(_inSockets.values());
sockets.addAll(_outSockets.values());
}
return sockets;
}
/**
* Ping the specified peer, returning true if they replied to the ping within
* the timeout specified, false otherwise. This call blocks.
*
*/
public boolean ping(Destination peer, long timeoutMs) {
try {
return _session.sendMessage(peer, new byte[] { (byte) CHAFF});
} catch (I2PException ex) {
_log.warn(getName() + ": I2PException:", ex);
return false;
}
}
public void removeSocket(I2PSocketImpl sock) {
String localId = sock.getLocalID();
boolean removed = false;
synchronized (lock) {
removed = (null != _inSockets.remove(localId));
removed = removed || (null != _outSockets.remove(localId));
lock.notify();
}
long now = _context.clock().now();
long lifetime = now - sock.getCreatedOn();
long timeSinceClose = now - sock.getClosedOn();
long sent = sock.getBytesSent();
long recv = sock.getBytesReceived();
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(getName() + ": Removing socket \"" + getReadableForm(localId) + "\" [" + sock
+ ", send: " + sent + ", recv: " + recv
+ ", lifetime: " + lifetime + "ms, time since close: " + timeSinceClose
+ " removed? " + removed + ")]",
new Exception("removeSocket called"));
}
_context.statManager().addRateData("streaming.lifetime", lifetime, lifetime);
_context.statManager().addRateData("streaming.sent", sent, lifetime);
_context.statManager().addRateData("streaming.received", recv, lifetime);
if (sent > recv) {
_context.statManager().addRateData("streaming.transferBalance", 1, lifetime);
} else if (recv > sent) {
_context.statManager().addRateData("streaming.transferBalance", -1, lifetime);
} else {
// noop
}
}
public String getName() { return _name; }
public void setName(String name) { _name = name; }
public static String getReadableForm(String id) {
if (id == null) return "(null)";
if (id.length() != 3) return "Bogus";
return Base64.encode(toBytes(id));
}
/**
* Create a new part the connection ID that is locally unique
*
* @param uniqueIn map of already known local IDs so we don't collide. WARNING - NOT THREADSAFE!
*/
private static String makeID(HashMap uniqueIn) {
String newID;
do {
int id = (int) (Math.random() * 16777215 + 1);
byte[] nid = new byte[3];
nid[0] = (byte) (id / 65536);
nid[1] = (byte) ((id / 256) % 256);
nid[2] = (byte) (id % 256);
newID = toString(nid);
} while (uniqueIn.get(newID) != null);
return newID;
}
/**
* Create a new packet of the given type for the specified connection containing
* the given payload
*/
public static byte[] makePacket(byte type, String id, byte[] payload) {
byte[] packet = new byte[payload.length + 4];
packet[0] = type;
byte[] temp = toBytes(id);
if (temp.length != 3) throw new RuntimeException("Incorrect ID length: " + temp.length);
System.arraycopy(temp, 0, packet, 1, 3);
System.arraycopy(payload, 0, packet, 4, payload.length);
return packet;
}
private static final String toString(byte data[]) {
try {
return new String(data, "ISO-8859-1");
} catch (UnsupportedEncodingException uee) {
throw new RuntimeException("WTF! iso-8859-1 isn't supported?");
}
}
private static final byte[] toBytes(String str) {
try {
return str.getBytes("ISO-8859-1");
} catch (UnsupportedEncodingException uee) {
throw new RuntimeException("WTF! iso-8859-1 isn't supported?");
}
}
}

View File

@ -1,5 +1,7 @@
package net.i2p.client.streaming;
import java.util.Properties;
/**
* Define the configuration for streaming and verifying data on the socket.
*
@ -19,7 +21,18 @@ public class I2PSocketOptions {
_writeTimeout = DEFAULT_WRITE_TIMEOUT;
_maxBufferSize = DEFAULT_BUFFER_SIZE;
}
public I2PSocketOptions(I2PSocketOptions opts) {
_connectTimeout = opts.getConnectTimeout();
_readTimeout = opts.getReadTimeout();
_writeTimeout = opts.getWriteTimeout();
_maxBufferSize = opts.getMaxBufferSize();
}
public I2PSocketOptions(Properties opts) {
}
/**
* How long we will wait for the ACK from a SYN, in milliseconds.
*

View File

@ -15,9 +15,8 @@ import java.io.Serializable;
* Wrap up an array of bytes so that they can be compared and placed in hashes,
* maps, and the like.
*
* @author jrandom
*/
public class ByteArray implements Serializable {
public class ByteArray implements Serializable, Comparable {
private byte[] _data;
public ByteArray() {
@ -28,7 +27,7 @@ public class ByteArray implements Serializable {
_data = data;
}
public byte[] getData() {
public final byte[] getData() {
return _data;
}
@ -36,7 +35,7 @@ public class ByteArray implements Serializable {
_data = data;
}
public boolean equals(Object o) {
public final boolean equals(Object o) {
if (o == null) return false;
if (o instanceof ByteArray) {
return compare(getData(), ((ByteArray) o).getData());
@ -50,15 +49,20 @@ public class ByteArray implements Serializable {
}
}
private boolean compare(byte[] lhs, byte[] rhs) {
private static final boolean compare(byte[] lhs, byte[] rhs) {
return DataHelper.eq(lhs, rhs);
}
public final int compareTo(Object obj) {
if (obj.getClass() != getClass()) throw new ClassCastException("invalid object: " + obj);
return DataHelper.compareTo(_data, ((ByteArray)obj).getData());
}
public int hashCode() {
public final int hashCode() {
return DataHelper.hashCode(getData());
}
public String toString() {
public final String toString() {
return DataHelper.toString(getData(), 32);
}
}

View File

@ -102,10 +102,11 @@ public class Destination extends DataStructureImpl {
_signingKey = new SigningPublicKey();
buf = new byte[SigningPublicKey.KEYSIZE_BYTES];
System.arraycopy(source, cur, buf, 0, SigningPublicKey.KEYSIZE_BYTES);
_signingKey.setData(buf);
cur += SigningPublicKey.KEYSIZE_BYTES;
_certificate = new Certificate();
cur += _certificate.readBytes(buf, cur);
cur += _certificate.readBytes(source, cur);
return cur - offset;
}

View File

@ -1,4 +1,9 @@
$Id: history.txt,v 1.53 2004/10/17 16:03:06 jrandom Exp $
$Id: history.txt,v 1.54 2004/10/18 14:08:01 jrandom Exp $
2004-10-23 jrandom
* Minor ministreaming lib refactoring to simplify integration of the full
streaming lib.
* Minor bugfixes to data structure serialization.
* 2004-10-18 0.4.1.3 released

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.59 $ $Date: 2004/10/17 16:03:05 $";
public final static String ID = "$Revision: 1.60 $ $Date: 2004/10/18 14:08:00 $";
public final static String VERSION = "0.4.1.3";
public final static long BUILD = 0;
public final static long BUILD = 1;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);