forked from I2P_Developers/i2p.i2p
NTCP: More efficient copying of inbound establish state data
More prep for NTCP2
This commit is contained in:
@ -408,18 +408,29 @@ class EventPumper implements Runnable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Called by the connection when it has data ready to write.
|
||||
* If we have bandwidth, calls con.Write() which calls wantsWrite(con).
|
||||
* If no bandwidth, calls con.queuedWrite().
|
||||
*/
|
||||
public void wantsWrite(NTCPConnection con, byte data[]) {
|
||||
ByteBuffer buf = ByteBuffer.wrap(data);
|
||||
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, 0, "NTCP write");//con, buf);
|
||||
wantsWrite(con, data, 0, data.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the connection when it has data ready to write.
|
||||
* If we have bandwidth, calls con.Write() which calls wantsWrite(con).
|
||||
* If no bandwidth, calls con.queuedWrite().
|
||||
*
|
||||
* @since 0.9.35 off/len version
|
||||
*/
|
||||
public void wantsWrite(NTCPConnection con, byte data[], int off, int len) {
|
||||
ByteBuffer buf = ByteBuffer.wrap(data, off, len);
|
||||
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(len, 0, "NTCP write");//con, buf);
|
||||
if (req.getPendingRequested() > 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("queued write on " + con + " for " + data.length);
|
||||
_log.info("queued write on " + con + " for " + len);
|
||||
_context.statManager().addRateData("ntcp.wantsQueuedWrite", 1);
|
||||
con.queuedWrite(buf, req);
|
||||
} else {
|
||||
|
@ -27,12 +27,6 @@ import net.i2p.util.SimpleByteCache;
|
||||
*/
|
||||
class InboundEstablishState extends EstablishBase {
|
||||
|
||||
/**
|
||||
* next index in _curEncrypted to write to (equals _curEncrypted length if the block is
|
||||
* ready to decrypt)
|
||||
*/
|
||||
private int _curEncryptedOffset;
|
||||
|
||||
/** current encrypted block we are reading (IB only) or an IV buf used at the end for OB */
|
||||
private byte _curEncrypted[];
|
||||
|
||||
@ -40,10 +34,12 @@ class InboundEstablishState extends EstablishBase {
|
||||
private RouterIdentity _aliceIdent;
|
||||
|
||||
/** contains the decrypted aliceIndexSize + aliceIdent + tsA + padding + aliceSig */
|
||||
private ByteArrayOutputStream _sz_aliceIdent_tsA_padding_aliceSig;
|
||||
private final ByteArrayOutputStream _sz_aliceIdent_tsA_padding_aliceSig;
|
||||
|
||||
/** how long we expect _sz_aliceIdent_tsA_padding_aliceSig to be when its full */
|
||||
private int _sz_aliceIdent_tsA_padding_aliceSigSize;
|
||||
|
||||
private static final int NTCP1_MSG1_SIZE = XY_SIZE + HXY_SIZE;
|
||||
|
||||
public InboundEstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) {
|
||||
super(ctx, transport, con);
|
||||
@ -75,7 +71,16 @@ class InboundEstablishState extends EstablishBase {
|
||||
* @return 1, 2, or 0 if unknown
|
||||
* @since 0.9.35
|
||||
*/
|
||||
public int getVersion() { return 1; }
|
||||
public int getVersion() {
|
||||
if (!_transport.isNTCP2Enabled())
|
||||
return 1;
|
||||
synchronized (_stateLock) {
|
||||
if (_state == State.IB_INIT)
|
||||
return 0;
|
||||
// TODO NTCP2 states
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* we are Bob, so receive these bytes as part of an inbound connection
|
||||
@ -90,19 +95,28 @@ class InboundEstablishState extends EstablishBase {
|
||||
* is synchronized, should be OK. See isComplete()
|
||||
*/
|
||||
private void receiveInbound(ByteBuffer src) {
|
||||
while (_state == State.IB_INIT && src.hasRemaining()) {
|
||||
byte c = src.get();
|
||||
_X[_received++] = c;
|
||||
if (_received >= XY_SIZE)
|
||||
changeState(State.IB_GOT_X);
|
||||
if (_state == State.IB_INIT && src.hasRemaining()) {
|
||||
int remaining = src.remaining();
|
||||
//if (remaining < NTCP1_MSG1_SIZE && _transport.isNTCP2Enabled()) {
|
||||
// // NTCP2
|
||||
//}
|
||||
int toGet = Math.min(remaining, XY_SIZE - _received);
|
||||
src.get(_X, _received, toGet);
|
||||
_received += toGet;
|
||||
if (_received < XY_SIZE)
|
||||
return;
|
||||
changeState(State.IB_GOT_X);
|
||||
_received = 0;
|
||||
}
|
||||
while (_state == State.IB_GOT_X && src.hasRemaining()) {
|
||||
int i = _received - XY_SIZE;
|
||||
_received++;
|
||||
byte c = src.get();
|
||||
_hX_xor_bobIdentHash[i] = c;
|
||||
if (i >= HXY_SIZE - 1)
|
||||
changeState(State.IB_GOT_HX);
|
||||
|
||||
if (_state == State.IB_GOT_X && src.hasRemaining()) {
|
||||
int toGet = Math.min(src.remaining(), HXY_SIZE - _received);
|
||||
src.get(_hX_xor_bobIdentHash, _received, toGet);
|
||||
_received += toGet;
|
||||
if (_received < HXY_SIZE)
|
||||
return;
|
||||
changeState(State.IB_GOT_HX);
|
||||
_received = 0;
|
||||
}
|
||||
|
||||
if (_state == State.IB_GOT_HX) {
|
||||
@ -184,19 +198,28 @@ class InboundEstablishState extends EstablishBase {
|
||||
_state == State.IB_GOT_RI) && src.hasRemaining()) {
|
||||
|
||||
// Collect a 16-byte block
|
||||
while (_curEncryptedOffset < AES_SIZE && src.hasRemaining()) {
|
||||
_curEncrypted[_curEncryptedOffset++] = src.get();
|
||||
_received++;
|
||||
if (_received < AES_SIZE && src.hasRemaining()) {
|
||||
int toGet = Math.min(src.remaining(), AES_SIZE - _received);
|
||||
src.get(_curEncrypted, _received, toGet);
|
||||
_received += toGet;
|
||||
if (_received < AES_SIZE) {
|
||||
// no more bytes available in the buffer, and only a partial
|
||||
// block was read, so we can't decrypt it.
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(prefix() + "end of available data with only a partial block read (" +
|
||||
+ _received + ")");
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Decrypt the 16-byte block
|
||||
if (_curEncryptedOffset >= AES_SIZE) {
|
||||
if (_received >= AES_SIZE) {
|
||||
_context.aes().decrypt(_curEncrypted, 0, _curDecrypted, 0, _dh.getSessionKey(),
|
||||
_prevEncrypted, 0, AES_SIZE);
|
||||
|
||||
byte swap[] = _prevEncrypted;
|
||||
_prevEncrypted = _curEncrypted;
|
||||
_curEncrypted = swap;
|
||||
_curEncryptedOffset = 0;
|
||||
_received = 0;
|
||||
|
||||
if (_state == State.IB_SENT_Y) { // we are on the first decrypted block
|
||||
int sz = (int)DataHelper.fromLong(_curDecrypted, 0, 2);
|
||||
@ -269,11 +292,6 @@ class InboundEstablishState extends EstablishBase {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// no more bytes available in the buffer, and only a partial
|
||||
// block was read, so we can't decrypt it.
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(prefix() + "end of available data with only a partial block read (" +
|
||||
_curEncryptedOffset + ", " + _received + ")");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -124,7 +124,7 @@ class NTCP2Payload {
|
||||
* @param payload writes to it starting at off
|
||||
* @return the new offset
|
||||
*/
|
||||
public int writePayload(byte[] payload, int off, List<Block> blocks) {
|
||||
public static int writePayload(byte[] payload, int off, List<Block> blocks) {
|
||||
for (Block block : blocks) {
|
||||
off = block.write(payload, off);
|
||||
}
|
||||
|
@ -120,7 +120,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
private boolean _enableNTCP2;
|
||||
private static final String NTCP2_PROTO_SHORT = "NXK2CS";
|
||||
private static final String OPT_NTCP2_SK = 'N' + NTCP2_PROTO_SHORT + "2s";
|
||||
private static final int NTCP2_INT_VERSION = 2;
|
||||
static final int NTCP2_INT_VERSION = 2;
|
||||
private static final String NTCP2_VERSION = Integer.toString(NTCP2_INT_VERSION);
|
||||
/** b64 static private key */
|
||||
private static final String PROP_NTCP2_SP = "i2np.ntcp2.sp";
|
||||
@ -1090,6 +1090,15 @@ public class NTCPTransport extends TransportImpl {
|
||||
*/
|
||||
boolean isNTCP2Enabled() { return _enableNTCP2; }
|
||||
|
||||
/**
|
||||
* The static priv key
|
||||
*
|
||||
* @since 0.9.35
|
||||
*/
|
||||
byte[] getNTCP2StaticPrivkey() {
|
||||
return _ntcp2StaticPrivkey;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the valid NTCP version of this NTCP address.
|
||||
*
|
||||
|
Reference in New Issue
Block a user