Skip to content

Commit

Permalink
Merge branch 'i2ptunnel-keepalive-client' into 'master'
Browse files Browse the repository at this point in the history
Implement client-side HTTP persistence (keepalive) for the browser-to-client proxy socket

See merge request i2p-hackers/i2p.i2p!176
  • Loading branch information
zzz committed Feb 2, 2024
2 parents 3922795 + 23937c1 commit 7040d32
Show file tree
Hide file tree
Showing 5 changed files with 665 additions and 143 deletions.
152 changes: 139 additions & 13 deletions apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.i2ptunnel.util.GunzipOutputStream;
import net.i2p.i2ptunnel.util.*;
import net.i2p.i2ptunnel.util.LimitOutputStream.DoneCallback;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;

Expand All @@ -38,11 +39,13 @@ class HTTPResponseOutputStream extends FilterOutputStream {
private boolean _headerWritten;
private final byte _buf1[];
protected boolean _gzip;
protected long _dataExpected;
protected long _dataExpected = -1;
protected boolean _keepAliveIn, _keepAliveOut;
/** lower-case, trimmed */
protected String _contentType;
/** lower-case, trimmed */
protected String _contentEncoding;
private final DoneCallback _callback;

private static final int CACHE_SIZE = 4*1024;
private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE);
Expand All @@ -54,11 +57,66 @@ class HTTPResponseOutputStream extends FilterOutputStream {
private static final byte[] CRLF = DataHelper.getASCII("\r\n");

public HTTPResponseOutputStream(OutputStream raw) {
this(raw, null);
}

/**
* Optionally call callback when we're done.
*
* @param cb may be null
* @since 0.9.62
*/
private HTTPResponseOutputStream(OutputStream raw, DoneCallback cb) {
super(raw);
I2PAppContext context = I2PAppContext.getGlobalContext();
_log = context.logManager().getLog(getClass());
_log = context.logManager().getLog(HTTPResponseOutputStream.class);
_headerBuffer = _cache.acquire();
_buf1 = new byte[1];
_callback = cb;
}

/**
* Optionally keep sockets alive and call callback when we're done.
*
* @param allowKeepAliveIn We may, but are not required to, keep the input socket alive.
* This is the server on the server side and I2P on the client side.
* @param allowKeepAliveOut We may, but are not required to, keep the output socket alive.
* This is I2P on the server side and the browser on the client side.
* @param isHead is this a response to a HEAD, and thus no data is expected (RFC 2616 sec. 4.4)
* @param cb non-null if allowKeepAlive is true
* @since 0.9.62
*/
public HTTPResponseOutputStream(OutputStream raw, boolean allowKeepAliveIn, boolean allowKeepAliveOut,
boolean isHead, DoneCallback cb) {
this(raw, cb);
_keepAliveIn = allowKeepAliveIn;
_keepAliveOut = allowKeepAliveOut;
if (isHead)
_dataExpected = 0;
if (_log.shouldInfo())
_log.info("Before headers: keepaliveIn? " + allowKeepAliveIn + " keepaliveOut? " + allowKeepAliveOut);
}

/**
* Should we keep the input stream alive when done?
*
* @return false before the headers are written
* @since 0.9.62
*/
public boolean getKeepAliveIn() {
return _keepAliveIn && _headerWritten;
}

/**
* Should we keep the output stream alive when done?
* Only supported for the browser socket side.
* I2P socket on server side not supported yet.
*
* @return false before the headers are written
* @since 0.9.62
*/
public boolean getKeepAliveOut() {
return _keepAliveOut && _headerWritten;
}

@Override
Expand Down Expand Up @@ -130,7 +188,7 @@ private boolean headerReceived() {
byte second = data[valid - 2];
return second == NL; // \n\n
}

/**
* Possibly tweak that first HTTP response line (HTTP/1.0 200 OK, etc).
* Overridden on server side.
Expand All @@ -142,21 +200,37 @@ protected String filterResponseLine(String line) {

/** ok, received, now munge & write it */
private void writeHeader() throws IOException {
String responseLine = null;

boolean connectionSent = false;
boolean chunked = false;

int lastEnd = -1;
byte[] data = _headerBuffer.getData();
int valid = _headerBuffer.getValid();
for (int i = 0; i < valid; i++) {
if (data[i] == NL) {
if (lastEnd == -1) {
responseLine = DataHelper.getUTF8(data, 0, i+1); // includes NL
responseLine = filterResponseLine(responseLine);
String responseLine = DataHelper.getUTF8(data, 0, i+1); // includes NL
responseLine = (responseLine.trim() + "\r\n");
if (_log.shouldInfo())
_log.info("Response: " + responseLine.trim());
// Persistent conn requires HTTP/1.1
if (!responseLine.startsWith("HTTP/1.1 ")) {
_keepAliveIn = false;
_keepAliveOut = false;
}
// force zero datalen for 1xx, 204, 304 (RFC 2616 sec. 4.4)
// so that these don't prevent keepalive
int sp = responseLine.indexOf(" ");
if (sp > 0) {
String s = responseLine.substring(sp + 1);
if (s.startsWith("1") || s.startsWith("204") || s.startsWith("304"))
_dataExpected = 0;
} else {
// no status?
_keepAliveIn = false;
_keepAliveOut = false;
}

out.write(DataHelper.getUTF8(responseLine));
} else {
for (int j = lastEnd+1; j < i; j++) {
Expand All @@ -180,14 +254,26 @@ private void writeHeader() throws IOException {
if (val.toLowerCase(Locale.US).contains("upgrade")) {
// pass through for websocket
out.write(DataHelper.getASCII("Connection: " + val + "\r\n"));
// Disable persistence
_keepAliveOut = false;
} else {
out.write(CONNECTION_CLOSE);
// Strip to allow persistence, replace to disallow
if (!_keepAliveOut)
out.write(CONNECTION_CLOSE);
}
// We do not expect Connection: keep-alive here,
// as it's the default for HTTP/1.1, the server proxy doesn't support it,
// and we don't support keepalive for HTTP/1.0
_keepAliveIn = false;
connectionSent = true;
} else if ("proxy-connection".equals(lcKey)) {
// Nonstandard, strip
} else if ("content-encoding".equals(lcKey) && "x-i2p-gzip".equals(val.toLowerCase(Locale.US))) {
_gzip = true;
// client side only
// x-i2p-gzip is not chunked, which is nonstandard, but we track the
// end of data in GunzipOutputStream and call the callback,
// so we can support i2p-side keepalive here.
} else if ("proxy-authenticate".equals(lcKey)) {
// filter this hop-by-hop header; outproxy authentication must be configured in I2PTunnelHTTPClient
// see e.g. http://blog.c22.cc/2013/03/11/privoxy-proxy-authentication-credential-exposure-cve-2013-2503/
Expand All @@ -203,6 +289,9 @@ private void writeHeader() throws IOException {
} else if ("content-encoding".equals(lcKey)) {
// save for compress decision on server side
_contentEncoding = val.toLowerCase(Locale.US);
} else if ("transfer-encoding".equals(lcKey) && val.toLowerCase(Locale.US).contains("chunked")) {
// save for keepalive decision on client side
chunked = true;
} else if ("set-cookie".equals(lcKey)) {
String lcVal = val.toLowerCase(Locale.US);
if (lcVal.contains("domain=b32.i2p") ||
Expand All @@ -225,19 +314,55 @@ private void writeHeader() throws IOException {
lastEnd = i;
}
}

// Now make the final keepalive decisions
if (_keepAliveOut) {
// we need one but not both
if ((chunked && _dataExpected >= 0) ||
(!chunked && _dataExpected < 0))
_keepAliveOut = false;
}
if (_keepAliveIn) {
// we need one but not both
if ((chunked && _dataExpected >= 0) ||
(!chunked && _dataExpected < 0))
_keepAliveIn = false;
}

if (!connectionSent)
if (!connectionSent && !_keepAliveOut)
out.write(CONNECTION_CLOSE);

finishHeaders();

boolean shouldCompress = shouldCompress();
if (_log.shouldInfo())
_log.info("After headers: gzip? " + _gzip + " compress? " + shouldCompress);
_log.info("After headers: gzip? " + _gzip + " compress? " + shouldCompress + " keepaliveIn? " + _keepAliveIn + " keepaliveOut? " + _keepAliveOut);

if (data.length == CACHE_SIZE)
_cache.release(_headerBuffer);
_headerBuffer = null;

// Setup the keepalive streams
// Until we have keepalive for the i2p socket, the client side
// does not need to do this, we just wait for the socket to close.
// Until we have keepalive for the server socket, the server side
// does not need to do this, we just wait for the socket to close.
if (_keepAliveIn && !shouldCompress) {
if (_dataExpected > 0) {
// content-length
// filter output stream to count the data
out = new ByteLimitOutputStream(out, _callback, _dataExpected);
} else if (_dataExpected == 0) {
if (_callback != null)
_callback.streamDone();
} else {
// -1, chunked
// filter output stream to look for the end
// do not strip the chunking; pass it through
out = new DechunkedOutputStream(out, _callback, false);
}
}

if (shouldCompress) {
beginProcessing();
}
Expand All @@ -252,15 +377,16 @@ protected void finishHeaders() throws IOException {
@Override
public void close() throws IOException {
if (_log.shouldInfo())
_log.info("Closing " + out + " compressed? " + shouldCompress(), new Exception("I did it"));
_log.info("Closing " + out + " headers written? " + _headerWritten + " compressed? " + shouldCompress() +
" keepaliveIn? " + _keepAliveIn + " keepaliveOut? " + _keepAliveOut, new Exception("I did it"));
synchronized(this) {
// synch with changing out field below
super.close();
}
}

protected void beginProcessing() throws IOException {
OutputStream po = new GunzipOutputStream(out);
OutputStream po = new GunzipOutputStream(out, _callback);
synchronized(this) {
out = po;
}
Expand Down
Loading

0 comments on commit 7040d32

Please sign in to comment.