From 23937c11de0651979c27e2ecd4ca49a24b16d392 Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 2 Feb 2024 15:42:58 +0000 Subject: [PATCH] Implement client-side HTTP persistence (keepalive) for the browser-to-client proxy socket --- .../i2ptunnel/HTTPResponseOutputStream.java | 152 ++++++++++- .../i2p/i2ptunnel/I2PTunnelHTTPClient.java | 228 ++++++++++++----- .../i2ptunnel/I2PTunnelHTTPClientRunner.java | 179 +++++++++++-- .../net/i2p/i2ptunnel/I2PTunnelRunner.java | 236 ++++++++++++++---- .../src/net/i2p/i2ptunnel/I2PTunnelTask.java | 13 + 5 files changed, 665 insertions(+), 143 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java index 0fd3c538b7..fe8a78e2ae 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java @@ -16,7 +16,8 @@ import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; -import net.i2p.i2ptunnel.util.GunzipOutputStream; +import net.i2p.i2ptunnel.util.*; +import net.i2p.i2ptunnel.util.LimitOutputStream.DoneCallback; import net.i2p.util.ByteCache; import net.i2p.util.Log; @@ -38,11 +39,13 @@ class HTTPResponseOutputStream extends FilterOutputStream { private boolean _headerWritten; private final byte _buf1[]; protected boolean _gzip; - protected long _dataExpected; + protected long _dataExpected = -1; + protected boolean _keepAliveIn, _keepAliveOut; /** lower-case, trimmed */ protected String _contentType; /** lower-case, trimmed */ protected String _contentEncoding; + private final DoneCallback _callback; private static final int CACHE_SIZE = 4*1024; private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE); @@ -54,11 +57,66 @@ class HTTPResponseOutputStream extends FilterOutputStream { private static final byte[] CRLF = DataHelper.getASCII("\r\n"); public HTTPResponseOutputStream(OutputStream raw) { + this(raw, null); + } + + /** + * Optionally call callback when we're done. + * + * @param cb may be null + * @since 0.9.62 + */ + private HTTPResponseOutputStream(OutputStream raw, DoneCallback cb) { super(raw); I2PAppContext context = I2PAppContext.getGlobalContext(); - _log = context.logManager().getLog(getClass()); + _log = context.logManager().getLog(HTTPResponseOutputStream.class); _headerBuffer = _cache.acquire(); _buf1 = new byte[1]; + _callback = cb; + } + + /** + * Optionally keep sockets alive and call callback when we're done. + * + * @param allowKeepAliveIn We may, but are not required to, keep the input socket alive. + * This is the server on the server side and I2P on the client side. + * @param allowKeepAliveOut We may, but are not required to, keep the output socket alive. + * This is I2P on the server side and the browser on the client side. + * @param isHead is this a response to a HEAD, and thus no data is expected (RFC 2616 sec. 4.4) + * @param cb non-null if allowKeepAlive is true + * @since 0.9.62 + */ + public HTTPResponseOutputStream(OutputStream raw, boolean allowKeepAliveIn, boolean allowKeepAliveOut, + boolean isHead, DoneCallback cb) { + this(raw, cb); + _keepAliveIn = allowKeepAliveIn; + _keepAliveOut = allowKeepAliveOut; + if (isHead) + _dataExpected = 0; + if (_log.shouldInfo()) + _log.info("Before headers: keepaliveIn? " + allowKeepAliveIn + " keepaliveOut? " + allowKeepAliveOut); + } + + /** + * Should we keep the input stream alive when done? + * + * @return false before the headers are written + * @since 0.9.62 + */ + public boolean getKeepAliveIn() { + return _keepAliveIn && _headerWritten; + } + + /** + * Should we keep the output stream alive when done? + * Only supported for the browser socket side. + * I2P socket on server side not supported yet. + * + * @return false before the headers are written + * @since 0.9.62 + */ + public boolean getKeepAliveOut() { + return _keepAliveOut && _headerWritten; } @Override @@ -130,7 +188,7 @@ private boolean headerReceived() { byte second = data[valid - 2]; return second == NL; // \n\n } - + /** * Possibly tweak that first HTTP response line (HTTP/1.0 200 OK, etc). * Overridden on server side. @@ -142,9 +200,8 @@ protected String filterResponseLine(String line) { /** ok, received, now munge & write it */ private void writeHeader() throws IOException { - String responseLine = null; - boolean connectionSent = false; + boolean chunked = false; int lastEnd = -1; byte[] data = _headerBuffer.getData(); @@ -152,11 +209,28 @@ private void writeHeader() throws IOException { for (int i = 0; i < valid; i++) { if (data[i] == NL) { if (lastEnd == -1) { - responseLine = DataHelper.getUTF8(data, 0, i+1); // includes NL - responseLine = filterResponseLine(responseLine); + String responseLine = DataHelper.getUTF8(data, 0, i+1); // includes NL responseLine = (responseLine.trim() + "\r\n"); if (_log.shouldInfo()) _log.info("Response: " + responseLine.trim()); + // Persistent conn requires HTTP/1.1 + if (!responseLine.startsWith("HTTP/1.1 ")) { + _keepAliveIn = false; + _keepAliveOut = false; + } + // force zero datalen for 1xx, 204, 304 (RFC 2616 sec. 4.4) + // so that these don't prevent keepalive + int sp = responseLine.indexOf(" "); + if (sp > 0) { + String s = responseLine.substring(sp + 1); + if (s.startsWith("1") || s.startsWith("204") || s.startsWith("304")) + _dataExpected = 0; + } else { + // no status? + _keepAliveIn = false; + _keepAliveOut = false; + } + out.write(DataHelper.getUTF8(responseLine)); } else { for (int j = lastEnd+1; j < i; j++) { @@ -180,14 +254,26 @@ private void writeHeader() throws IOException { if (val.toLowerCase(Locale.US).contains("upgrade")) { // pass through for websocket out.write(DataHelper.getASCII("Connection: " + val + "\r\n")); + // Disable persistence + _keepAliveOut = false; } else { - out.write(CONNECTION_CLOSE); + // Strip to allow persistence, replace to disallow + if (!_keepAliveOut) + out.write(CONNECTION_CLOSE); } + // We do not expect Connection: keep-alive here, + // as it's the default for HTTP/1.1, the server proxy doesn't support it, + // and we don't support keepalive for HTTP/1.0 + _keepAliveIn = false; connectionSent = true; } else if ("proxy-connection".equals(lcKey)) { // Nonstandard, strip } else if ("content-encoding".equals(lcKey) && "x-i2p-gzip".equals(val.toLowerCase(Locale.US))) { _gzip = true; + // client side only + // x-i2p-gzip is not chunked, which is nonstandard, but we track the + // end of data in GunzipOutputStream and call the callback, + // so we can support i2p-side keepalive here. } else if ("proxy-authenticate".equals(lcKey)) { // filter this hop-by-hop header; outproxy authentication must be configured in I2PTunnelHTTPClient // see e.g. http://blog.c22.cc/2013/03/11/privoxy-proxy-authentication-credential-exposure-cve-2013-2503/ @@ -203,6 +289,9 @@ private void writeHeader() throws IOException { } else if ("content-encoding".equals(lcKey)) { // save for compress decision on server side _contentEncoding = val.toLowerCase(Locale.US); + } else if ("transfer-encoding".equals(lcKey) && val.toLowerCase(Locale.US).contains("chunked")) { + // save for keepalive decision on client side + chunked = true; } else if ("set-cookie".equals(lcKey)) { String lcVal = val.toLowerCase(Locale.US); if (lcVal.contains("domain=b32.i2p") || @@ -225,19 +314,55 @@ private void writeHeader() throws IOException { lastEnd = i; } } + + // Now make the final keepalive decisions + if (_keepAliveOut) { + // we need one but not both + if ((chunked && _dataExpected >= 0) || + (!chunked && _dataExpected < 0)) + _keepAliveOut = false; + } + if (_keepAliveIn) { + // we need one but not both + if ((chunked && _dataExpected >= 0) || + (!chunked && _dataExpected < 0)) + _keepAliveIn = false; + } - if (!connectionSent) + if (!connectionSent && !_keepAliveOut) out.write(CONNECTION_CLOSE); finishHeaders(); boolean shouldCompress = shouldCompress(); if (_log.shouldInfo()) - _log.info("After headers: gzip? " + _gzip + " compress? " + shouldCompress); + _log.info("After headers: gzip? " + _gzip + " compress? " + shouldCompress + " keepaliveIn? " + _keepAliveIn + " keepaliveOut? " + _keepAliveOut); if (data.length == CACHE_SIZE) _cache.release(_headerBuffer); _headerBuffer = null; + + // Setup the keepalive streams + // Until we have keepalive for the i2p socket, the client side + // does not need to do this, we just wait for the socket to close. + // Until we have keepalive for the server socket, the server side + // does not need to do this, we just wait for the socket to close. + if (_keepAliveIn && !shouldCompress) { + if (_dataExpected > 0) { + // content-length + // filter output stream to count the data + out = new ByteLimitOutputStream(out, _callback, _dataExpected); + } else if (_dataExpected == 0) { + if (_callback != null) + _callback.streamDone(); + } else { + // -1, chunked + // filter output stream to look for the end + // do not strip the chunking; pass it through + out = new DechunkedOutputStream(out, _callback, false); + } + } + if (shouldCompress) { beginProcessing(); } @@ -252,7 +377,8 @@ protected void finishHeaders() throws IOException { @Override public void close() throws IOException { if (_log.shouldInfo()) - _log.info("Closing " + out + " compressed? " + shouldCompress(), new Exception("I did it")); + _log.info("Closing " + out + " headers written? " + _headerWritten + " compressed? " + shouldCompress() + + " keepaliveIn? " + _keepAliveIn + " keepaliveOut? " + _keepAliveOut, new Exception("I did it")); synchronized(this) { // synch with changing out field below super.close(); @@ -260,7 +386,7 @@ public void close() throws IOException { } protected void beginProcessing() throws IOException { - OutputStream po = new GunzipOutputStream(out); + OutputStream po = new GunzipOutputStream(out, _callback); synchronized(this) { out = po; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java index 589d5b0799..a5c6506391 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java @@ -40,6 +40,7 @@ import net.i2p.util.ConvertToHash; import net.i2p.util.DNSOverHTTPS; import net.i2p.util.EventDispatcher; +import net.i2p.util.InternalSocket; import net.i2p.util.Log; import net.i2p.util.PortMapper; @@ -98,6 +99,14 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn // overrides private static final String PROP_UA_I2P = "httpclient.userAgent.i2p"; private static final String PROP_UA_CLEARNET = "httpclient.userAgent.outproxy"; + public static final String OPT_KEEPALIVE_BROWSER = "keepalive.browser"; + public static final String OPT_KEEPALIVE_I2P = "keepalive.i2p"; + + // how long to wait for another request on the same socket + // Firefox timeout appears to be about 114 seconds, so it will close before we do. + static final int BROWSER_KEEPALIVE_TIMEOUT = 2*60*1000; + private static final boolean DEFAULT_KEEPALIVE_BROWSER = true; + private static final boolean DEFAULT_KEEPALIVE_I2P = true; /** * These are backups if the xxx.ht error page is missing. @@ -396,11 +405,39 @@ protected void clientConnectionRun(Socket s) { String currentProxy = null; long requestId = __requestId.incrementAndGet(); boolean shout = false; + boolean isConnect = false; + boolean isHead = false; I2PSocket i2ps = null; try { s.setSoTimeout(INITIAL_SO_TIMEOUT); out = s.getOutputStream(); InputReader reader = new InputReader(s.getInputStream()); + int requestCount = 0; + // HTTP Persistent Connections (RFC 2616) + // for the local browser-to-client-proxy socket. + // Keep it very simple. + // Will be set to false for non-GET/HEAD, non-HTTP/1.1, + // Connection: close, InternalSocket, + // or after analysis of the response headers in HTTPResponseOutputStream, + // or on errors in I2PTunnelRunner. + boolean keepalive = getBooleanOption(OPT_KEEPALIVE_BROWSER, DEFAULT_KEEPALIVE_BROWSER) && + !(s instanceof InternalSocket); + + // indent + do { // while (keepalive) + // indent + + if (requestCount > 0) { + try { + s.setSoTimeout(BROWSER_KEEPALIVE_TIMEOUT); + } catch (IOException ioe) { + if (_log.shouldInfo()) + _log.info("Socket closed before request #" + requestCount); + return; + } + if (_log.shouldInfo()) + _log.info("Keepalive, awaiting request #" + requestCount); + } String line, method = null, protocol = null, host = null, destination = null; String hostLowerCase = null; StringBuilder newRequest = new StringBuilder(); @@ -422,10 +459,10 @@ protected void clientConnectionRun(Socket s) { String lowercaseLine = line.toLowerCase(Locale.US); - if(method == null) { // first line (GET /base64/realaddr) - if(_log.shouldLog(Log.DEBUG)) { - _log.debug(getPrefix(requestId) + "First line [" + line + "]"); - } + if(method == null) { + // first line GET/POST/etc. + if (_log.shouldInfo()) + _log.info(getPrefix(requestId) + "req #" + requestCount + " first line [" + line + "]"); String[] params = DataHelper.split(line, " ", 3); if(params.length != 3) { @@ -472,12 +509,19 @@ protected void clientConnectionRun(Socket s) { ****/ } - method = params[0]; - if (method.toUpperCase(Locale.US).equals("CONNECT")) { + method = params[0].toUpperCase(Locale.US); + if (method.equals("HEAD")) { + isHead = true; + } else if (method.equals("CONNECT")) { // this makes things easier later, by spoofing a // protocol so the URI parser find the host and port // For in-net outproxy, will be fixed up below request = "https://" + request + '/'; + isConnect = true; + keepalive = false; + } else if (!method.equals("GET")) { + // POST, PUT, ... + keepalive = false; } // Now use the Java URI parser @@ -559,6 +603,8 @@ protected void clientConnectionRun(Socket s) { } String protocolVersion = params[2]; + if (!protocolVersion.equals("HTTP/1.1")) + keepalive = false; protocol = requestURI.getScheme(); host = requestURI.getHost(); @@ -641,8 +687,7 @@ protected void clientConnectionRun(Socket s) { break; } ******/ - } else if ("https".equals(protocol) || - method.toUpperCase(Locale.US).equals("CONNECT")) { + } else if ("https".equals(protocol) || isConnect) { remotePort = 443; } else { remotePort = 80; @@ -806,19 +851,21 @@ protected void clientConnectionRun(Socket s) { host = getHostName(addressHelper); } - // now strip everything but path and query from URI targetRequest = requestURI.toASCIIString(); - String newURI = requestURI.getRawPath(); - if(query != null) { - newURI += '?' + query; - } - try { - requestURI = new URI(newURI); - } catch(URISyntaxException use) { - // shouldnt happen - _log.warn(request, use); - method = null; - break; + if (!isConnect) { + // now strip everything but path and query from URI + String newURI = requestURI.getRawPath(); + if(query != null) { + newURI += '?' + query; + } + try { + requestURI = new URI(newURI); + } catch(URISyntaxException use) { + // shouldnt happen + _log.warn(request, use); + method = null; + break; + } } // end of (host endsWith(".i2p")) @@ -844,8 +891,7 @@ protected void clientConnectionRun(Socket s) { int rPort = requestURI.getPort(); if (rPort > 0) remotePort = rPort; - else if ("https".equals(protocol) || - method.toUpperCase(Locale.US).equals("CONNECT")) + else if ("https".equals(protocol) || isConnect) remotePort = 443; else remotePort = 80; @@ -864,8 +910,7 @@ else if ("https".equals(protocol) || if(_log.shouldLog(Log.DEBUG)) { _log.debug("Before selecting outproxy for " + host); } - if ("https".equals(protocol) || - method.toUpperCase(Locale.US).equals("CONNECT")) + if ("https".equals(protocol) || isConnect) currentProxy = selectSSLProxy(hostLowerCase); else currentProxy = selectProxy(hostLowerCase); @@ -921,16 +966,22 @@ else if ("https".equals(protocol) || break; } - if (method.toUpperCase(Locale.US).equals("CONNECT")) { + if (isConnect) { // fix up the change to requestURI above to get back to the original host:port - line = method + ' ' + requestURI.getHost() + ':' + requestURI.getPort() + ' ' + protocolVersion; + if (usingInternalOutproxy || usingWWWProxy) + line = method + ' ' + requestURI.getHost() + ':' + requestURI.getPort() + ' ' + protocolVersion; + else + line = method + ' ' + host + ':' + remotePort + ' ' + protocolVersion; } else { line = method + ' ' + requestURI.toASCIIString() + ' ' + protocolVersion; } if(_log.shouldLog(Log.DEBUG)) { + _log.debug(getPrefix(requestId) + "REQ : \"" + request + "\""); + _log.debug(getPrefix(requestId) + "REQURI: \"" + requestURI + "\""); _log.debug(getPrefix(requestId) + "NEWREQ: \"" + line + "\""); _log.debug(getPrefix(requestId) + "HOST : \"" + host + "\""); + _log.debug(getPrefix(requestId) + "RPORT : \"" + remotePort + "\""); _log.debug(getPrefix(requestId) + "DEST : \"" + destination + "\""); } @@ -941,11 +992,22 @@ else if ("https".equals(protocol) || if (lowercaseLine.contains("upgrade")) { // pass through for websocket preserveConnectionHeader = true; + keepalive = false; + } else if (lowercaseLine.contains("keep-alive")) { + // pass through + if (!keepalive) + continue; + // pass through + preserveConnectionHeader = true; } else { + if (lowercaseLine.contains("close")) + keepalive = false; continue; } } else if (lowercaseLine.startsWith("keep-alive: ") || lowercaseLine.startsWith("proxy-connection: ")) { + if (lowercaseLine.contains("close")) + keepalive = false; continue; } else if (lowercaseLine.startsWith("host: ") && !usingWWWProxy && !usingInternalOutproxy) { // Note that we only pass the original Host: line through to the outproxy @@ -1053,8 +1115,7 @@ else if ("https".equals(protocol) || if(ok != null) { gzip = Boolean.parseBoolean(ok); } - if(gzip && !usingInternalServer && - !method.toUpperCase(Locale.US).equals("CONNECT")) { + if(gzip && !usingInternalServer && !isConnect) { // according to rfc2616 s14.3, this *should* force identity, even if // an explicit q=0 for gzip doesn't. tested against orion.i2p, and it // seems to work. @@ -1063,7 +1124,7 @@ else if ("https".equals(protocol) || if (!usingInternalOutproxy) newRequest.append("X-Accept-Encoding: x-i2p-gzip;q=1.0, identity;q=0.5, deflate;q=0, gzip;q=0, *;q=0\r\n"); } - if(!shout && !method.toUpperCase(Locale.US).equals("CONNECT")) { + if(!shout && !isConnect) { if(!Boolean.parseBoolean(getTunnel().getClientOptions().getProperty(PROP_USER_AGENT))) { // let's not advertise to external sites that we are from I2P String ua; @@ -1110,12 +1171,16 @@ else if ("https".equals(protocol) || } } // end header processing - if(_log.shouldLog(Log.DEBUG)) { - _log.debug(getPrefix(requestId) + "NewRequest header: [" + newRequest.toString() + "]"); - } + if (newRequest.length() > 0 && _log.shouldDebug()) + _log.debug(getPrefix(requestId) + "NewRequest header: [" + newRequest + ']'); if(method == null || (destination == null && !usingInternalOutproxy)) { - //l.log("No HTTP method found in the request."); + if (requestCount > 0) { + // SocketTimeout, normal to get here for persistent connections, + // because DataHelper.readLine() returns null on EOF + return; + } + _log.debug("No HTTP method found in the request."); try { if (protocol != null && "http".equals(protocol.toLowerCase(Locale.US))) { out.write(getErrorPage("denied", ERR_REQUEST_DENIED).getBytes("UTF-8")); @@ -1134,6 +1199,7 @@ else if ("https".equals(protocol) || } // Authorization + // Yes, this is sent and checked for every request on a persistent connection AuthResult result = authorize(s, requestId, method, authorization); if (result != AuthResult.AUTH_GOOD) { if(_log.shouldLog(Log.WARN)) { @@ -1175,7 +1241,7 @@ else if ("https".equals(protocol) || OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId); byte[] data; byte[] response; - if (method.toUpperCase(Locale.US).equals("CONNECT")) { + if (isConnect) { data = null; response = SUCCESS_RESPONSE.getBytes("UTF-8"); } else { @@ -1320,7 +1386,7 @@ else if ("https".equals(protocol) || } // as of 0.9.35, allowInternalSSL defaults to true, and overridden to true unless PROP_SSL_SET is set - if (method.toUpperCase(Locale.US).equals("CONNECT") && + if (isConnect && !usingWWWProxy && getTunnel().getClientOptions().getProperty(PROP_SSL_SET) != null && !Boolean.parseBoolean(getTunnel().getClientOptions().getProperty(PROP_INTERNAL_SSL, "true"))) { @@ -1368,33 +1434,51 @@ else if ("https".equals(protocol) || return; } - Properties opts = new Properties(); - //opts.setProperty("i2p.streaming.inactivityTimeout", ""+120*1000); - // 1 == disconnect. see ConnectionOptions in the new streaming lib, which i - // dont want to hard link to here - //opts.setProperty("i2p.streaming.inactivityTimeoutAction", ""+1); - I2PSocketOptions sktOpts; - try { - sktOpts = getDefaultOptions(opts); - } catch (RuntimeException re) { - // tunnel build failure - StringBuilder buf = new StringBuilder(128); - buf.append("HTTP/1.1 503 Service Unavailable"); - if (re.getMessage() != null) - buf.append(" - ").append(re.getMessage()); - buf.append("\r\n\r\n"); + // Close persistent I2PSocket if destination or port changes + // and open a new one. + // We do not maintain a pool of open I2PSockets or look for + // an available one. Keep it very simple. + // As long as the traffic keeps going to the same place + // we will keep reusing it. + // While we should be able to reuse it if only the port changes, + // that should be extremely rare, so don't bother. + // For common use patterns including outproxy use, + // this should still be quite effective. + if (i2ps == null || i2ps.isClosed() || + remotePort != i2ps.getPort() || + !clientDest.equals(i2ps.getPeerDestination())) { + if (i2ps != null) { + if (_log.shouldInfo()) + _log.info("Old socket closed or different dest/port, opening new one"); + try { i2ps.close(); } catch (IOException ioe) {} + } + Properties opts = new Properties(); + //opts.setProperty("i2p.streaming.inactivityTimeout", ""+120*1000); + // 1 == disconnect. see ConnectionOptions in the new streaming lib, which i + // dont want to hard link to here + //opts.setProperty("i2p.streaming.inactivityTimeoutAction", ""+1); + I2PSocketOptions sktOpts; try { - out.write(buf.toString().getBytes("UTF-8")); - } catch (IOException ioe) {} - throw re; + sktOpts = getDefaultOptions(opts); + } catch (RuntimeException re) { + // tunnel build failure + StringBuilder buf = new StringBuilder(128); + buf.append("HTTP/1.1 503 Service Unavailable"); + if (re.getMessage() != null) + buf.append(" - ").append(re.getMessage()); + buf.append("\r\n\r\n"); + try { + out.write(buf.toString().getBytes("UTF-8")); + } catch (IOException ioe) {} + throw re; + } + if (remotePort > 0) + sktOpts.setPort(remotePort); + i2ps = createI2PSocket(clientDest, sktOpts); } - if (remotePort > 0) - sktOpts.setPort(remotePort); - i2ps = createI2PSocket(clientDest, sktOpts); - boolean isConnect = method.toUpperCase(Locale.US).equals("CONNECT"); - OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, - currentProxy, requestId, hostLowerCase, isConnect); + I2PTunnelRunner t; + I2PTunnelHTTPClientRunner hrunner = null; if (isConnect) { byte[] data; byte[] response; @@ -1409,7 +1493,12 @@ else if ("https".equals(protocol) || t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, (OnTimeout) null); } else { byte[] data = newRequest.toString().getBytes("ISO-8859-1"); - t = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout); + OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, + currentProxy, requestId, hostLowerCase, isConnect); + boolean keepaliveI2P = keepalive && getBooleanOption(OPT_KEEPALIVE_I2P, DEFAULT_KEEPALIVE_I2P); + hrunner = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout, + keepaliveI2P, keepalive, isHead); + t = hrunner; } if (usingWWWProxy) { t.setSuccessCallback(new OnProxySuccess(currentProxy, hostLowerCase, isConnect)); @@ -1417,7 +1506,26 @@ else if ("https".equals(protocol) || // we are called from an unlimited thread pool, so run inline //t.start(); t.run(); + + // I2PTunnelHTTPClientRunner spins off the browser-to-i2p thread and keeps + // the i2p-to-socket copier in-line. So we won't get here until the i2p socket is closed. + // check if whatever was in the response does not allow keepalive + if (keepalive && hrunner != null && !hrunner.getKeepAliveSocket()) + break; + // The old I2P socket was closed, null it out so we'll get a new one + // next time around + if (hrunner != null && !hrunner.getKeepAliveI2P()) + i2ps = null; + // go around again + requestCount++; + + // indent + } while (keepalive); + // indent + } catch(IOException ex) { + // This is normal for keepalive when the browser closed the socket, + // or a SocketTimeoutException if we gave up first if(_log.shouldLog(Log.INFO)) { _log.info(getPrefix(requestId) + "Error trying to connect", ex); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java index 3d8919e52e..7527af3570 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java @@ -8,8 +8,10 @@ import java.io.OutputStream; import java.net.Socket; import java.util.List; +import java.util.concurrent.RejectedExecutionException; import net.i2p.client.streaming.I2PSocket; +import net.i2p.util.I2PAppThread; /** * Override the response with a stream filtering the HTTP headers @@ -23,53 +25,186 @@ * Warning - not maintained as a stable API for external use. */ public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner { + private HTTPResponseOutputStream _hout; + private final boolean _isHead; /** * Does NOT start itself. Caller must call start(). + * + * @deprecated use other constructor */ + @Deprecated public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList, FailCallback onFail) { super(s, i2ps, slock, initialI2PData, null, sockList, onFail); + _isHead = false; + } + + /** + * Does NOT start itself. Caller must call start(). + * + * @param allowKeepAliveI2P we may, but are not required to, keep the I2P socket alive + * - Requires allowKeepAliveSocket + * @param allowKeepAliveSocket we may, but are not required to, keep the browser-side socket alive + * NO data will be forwarded from the socket to the i2psocket other than + * initialI2PData if this is true. + * @param isHead is this a response to a HEAD, and thus no data is expected (RFC 2616 sec. 4.4) + * @since 0.9.62 + */ + public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, + List sockList, FailCallback onFail, + boolean allowKeepAliveI2P, + boolean allowKeepAliveSocket, boolean isHead) { + super(s, i2ps, slock, initialI2PData, null, sockList, onFail, allowKeepAliveI2P, allowKeepAliveSocket); + if (allowKeepAliveI2P && !allowKeepAliveSocket) + throw new IllegalArgumentException(); + _isHead = isHead; } /** * Only call once! + * + * @return an HTTPResponseOutputStream + * @throws IllegalStateException if called again */ @Override protected OutputStream getSocketOut() throws IOException { + if (_hout != null) + throw new IllegalStateException("already called"); OutputStream raw = super.getSocketOut(); - return new HTTPResponseOutputStream(raw); + _hout = new HTTPResponseOutputStream(raw, super.getKeepAliveI2P(), super.getKeepAliveSocket(), _isHead, this); + return _hout; + } + + /** + * Should we keep the local browser socket open when done? + * @since 0.9.62 + */ + @Override + boolean getKeepAliveSocket() { + return _hout != null && _hout.getKeepAliveOut() && super.getKeepAliveSocket(); } /** - * Why is this overridden? - * Why flush in super but not here? - * Why do things in different order than in super? + * Should we keep the I2P socket open when done? + * @since 0.9.62 + */ + @Override + boolean getKeepAliveI2P() { + return _hout != null && _hout.getKeepAliveIn() && super.getKeepAliveI2P(); + } + + /** + * May not actually close either socket, depending on keepalive settings. + * + * @param out may be null + * @param in may be null + * @param i2pout may be null + * @param i2pin may be null + * @param s non-null + * @param i2ps non-null + * @param t1 may be null + * @param t2 may be null, ignored, we only join t1 */ @Override protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin, Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException { - if (i2pin != null) { try { - i2pin.close(); - } catch (IOException ioe) {} } - if (i2pout != null) { try { - i2pout.close(); - } catch (IOException ioe) {} } - if (in != null) { try { - in.close(); - } catch (IOException ioe) {} } + boolean keepaliveSocket = getKeepAliveSocket(); + boolean keepaliveI2P = getKeepAliveI2P(); + boolean threadI2PClose = keepaliveSocket && !keepaliveI2P && i2pout != null && !i2ps.isClosed(); + if (_log.shouldInfo()) + _log.info("Closing HTTPClientRunner keepaliveI2P? " + keepaliveI2P + " keepaliveSocket? " + keepaliveSocket + + " threadedClose? " + threadI2PClose, new Exception("I did it")); + if (threadI2PClose) { + // Thread the I2P stream/socket closing, because it is blocking, may take several seconds, + // and we don't want to delay the next request + Thread t = new I2PSocketCloser(i2pin, i2pout, i2ps); + TunnelControllerGroup tcg = TunnelControllerGroup.getInstance(); + if (tcg != null) { + try { + tcg.getClientExecutor().execute(t); + } catch (RejectedExecutionException ree) {} + } else { + t.start(); + } + } else { + if (!keepaliveI2P) { + if (i2pin != null) { try { + i2pin.close(); + } catch (IOException ioe) {} } + } + if (i2pout != null) { try { + if (keepaliveI2P) + i2pout.flush(); + else + i2pout.close(); + } catch (IOException ioe) {} } + } + + if (!keepaliveSocket) { + if (in != null) { try { + in.close(); + } catch (IOException ioe) {} } + } + if (out != null) { try { - out.close(); + if (keepaliveSocket) + out.flush(); + else + out.close(); } catch (IOException ioe) {} } - try { - i2ps.close(); - } catch (IOException ioe) {} - try { - s.close(); - } catch (IOException ioe) {} + + if (!threadI2PClose && !keepaliveI2P) { + try { + i2ps.close(); + } catch (IOException ioe) {} + } + + if (!keepaliveSocket) { + try { + s.close(); + } catch (IOException ioe) {} + } if (t1 != null) t1.join(30*1000); - // t2 = fromI2P now run inline - //t2.join(30*1000); + } + + /** + * Thread the I2P socket close, so we don't hold up + * the next request if the browser socket is keepalive. + * + * @since 0.9.xx + */ + private class I2PSocketCloser extends I2PAppThread { + private final InputStream in; + private final OutputStream out; + private final I2PSocket s; + + /** + * @param in may be null + * @param out non-null + * @param i2ps non-null + */ + public I2PSocketCloser(InputStream i2pin, OutputStream i2pout, I2PSocket i2ps) { + in = i2pin; + out = i2pout; + s = i2ps; + } + + @Override + public void run() { + if (in != null) { + try { + in.close(); + } catch (IOException ioe) {} + } + try { + out.close(); + } catch (IOException ioe) {} + try { + s.close(); + } catch (IOException ioe) {} + //_log.info("(threaded) i2p socket closed"); + } } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index c08619c0ac..8218e0f6f3 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -20,6 +20,7 @@ import net.i2p.client.streaming.I2PSocketException; import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; +import net.i2p.i2ptunnel.util.LimitOutputStream.DoneCallback; import net.i2p.util.ByteCache; import net.i2p.util.Clock; import net.i2p.util.I2PAppThread; @@ -27,11 +28,14 @@ import net.i2p.util.Log; /** - * A thread that starts two more threads, one to forward traffic in each direction. + * A thread that starts one more thread if keepAliveSocket is false, + * to forward traffic in each direction. + * When keepAliveSocket is true, we do not expect additional data and do not + * need a forwarding thread from the socket to I2P. * * Warning - not maintained as a stable API for external use. */ -public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener { +public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener, DoneCallback { protected final Log _log; private static final AtomicLong __runnerId = new AtomicLong(); @@ -52,8 +56,6 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr private volatile boolean finished; private final byte[] initialI2PData; private final byte[] initialSocketData; - /** when the last data was sent/received (or -1 if never) */ - private long lastActivityOn; /** when the runner started up */ private final long startedOn; private final List sockList; @@ -65,6 +67,10 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr private long totalSent; // does not include initialSocketData private long totalReceived; + // not final, may be changed by extending classes + protected volatile boolean _keepAliveI2P, _keepAliveSocket; + private StreamForwarder toI2P; + private StreamForwarder fromI2P; /** * For use in new constructor @@ -166,6 +172,29 @@ public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2P this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, false); } + /** + * With keepAlive args. Does NOT start itself. Caller must call start(). + * + * @param slock the socket lock, non-null + * @param initialI2PData may be null + * @param initialSocketData may be null + * @param sockList may be null. Caller must add i2ps to the list! It will be removed here on completion. + * Will synchronize on slock when removing. + * @param onFail May be null. If non-null and no data (except initial data) was received, + * it will be run before closing s. + * @param keepAliveI2P Do not close the I2P socket when done. + * @param keepAliveSocket Do not close the local socket when done. + * For client side only; must be false for server side. + * NO data will be forwarded from the socket to the i2psocket other than + * initialI2PData if this is true. + * @since 0.9.62 + */ + public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, + byte[] initialSocketData, List sockList, FailCallback onFail, + boolean keepAliveI2P, boolean keepAliveSocket) { + this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, keepAliveI2P, keepAliveSocket, false); + } + /** * Base constructor * @@ -182,6 +211,33 @@ public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2P private I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList, Runnable onTimeout, FailCallback onFail, boolean shouldStart) { + this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, false, false, shouldStart); + } + + /** + * Base constructor with keepAlive args + * + * @param slock the socket lock, non-null + * @param initialI2PData may be null + * @param initialSocketData may be null + * @param sockList may be null. Caller must add i2ps to the list! It will be removed here on completion. + * Will synchronize on slock when removing. + * @param onTimeout May be null. If non-null and no data (except initial data) was received, + * it will be run before closing s. + * @param onFail Trumps onTimeout + * @param shouldStart should thread be started in constructor (bad, false recommended) + * @param keepAliveI2P Do not close the I2P socket when done. + * @param keepAliveSocket Do not close the local socket when done. + * For client side only; must be false for server side. + * NO data will be forwarded from the socket to the i2psocket other than + * initialI2PData if this is true. + * @since 0.9.62 + */ + private I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, + byte[] initialSocketData, List sockList, Runnable onTimeout, + FailCallback onFail, + boolean keepAliveI2P, boolean keepAliveSocket, + boolean shouldStart) { this.sockList = sockList; this.s = s; this.i2ps = i2ps; @@ -190,15 +246,17 @@ private I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2 this.initialSocketData = initialSocketData; this.onTimeout = onTimeout; _onFail = onFail; - lastActivityOn = -1; startedOn = Clock.getInstance().now(); _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + _keepAliveI2P = keepAliveI2P; + _keepAliveSocket = keepAliveSocket; if (_log.shouldLog(Log.INFO)) _log.info("I2PTunnelRunner started"); _runnerId = __runnerId.incrementAndGet(); - setName("I2PTunnelRunner " + _runnerId); - if (shouldStart) + if (shouldStart) { + setName("I2PTunnelRunner " + _runnerId); start(); + } } /** @@ -221,14 +279,8 @@ public boolean isFinished() { */ @Deprecated public long getLastActivityOn() { - return lastActivityOn; - } - -/**** - private void updateActivity() { - lastActivityOn = Clock.getInstance().now(); + return -1L; } -****/ /** * When this runner started up transferring data @@ -251,6 +303,50 @@ public void setSuccessCallback(SuccessCallback sc) { protected InputStream getSocketIn() throws IOException { return s.getInputStream(); } protected OutputStream getSocketOut() throws IOException { return s.getOutputStream(); } + + /** + * Should we keep the I2P socket open when done? + * On the client side, only true if the browser and the server side support it. + * On the server side, only true if the client supports it. + * @since 0.9.62 + */ + boolean getKeepAliveI2P() { + return _keepAliveI2P; + } + + /** + * Should we keep the local browser/server socket open when done? + * Usually true for client side. + * Always false for server side. + * @since 0.9.62 + */ + boolean getKeepAliveSocket() { + return _keepAliveSocket; + } + + /** + * The DoneCallback for the I2P socket. + * + * @since 0.9.62 + */ + public void streamDone() { + if (_keepAliveSocket && fromI2P != null) { + // we are client-side + // tell the from-I2P runner + if (_log.shouldInfo()) + _log.info("Stream done from I2P", new Exception("I did it")); + fromI2P.done = true; + } else if (_keepAliveI2P && toI2P != null) { + // we are server-side + // tell the to-I2P runner + if (_log.shouldInfo()) + _log.info("Stream done from Server", new Exception("I did it")); + toI2P.done = true; + } else { + if (_log.shouldWarn()) + _log.info("Unexpected stream done", new Exception("I did it")); + } + } private static final byte[] POST = { 'P', 'O', 'S', 'T', ' ' }; private static final byte[] PUT = { 'P', 'U', 'T', ' ' }; @@ -263,10 +359,7 @@ public void run() { OutputStream out = null; InputStream i2pin = null; OutputStream i2pout = null; - StreamForwarder toI2P = null; - StreamForwarder fromI2P = null; try { - in = getSocketIn(); out = getSocketOut(); // = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE); // unimplemented in streaming //i2ps.setSocketErrorListener(this); @@ -299,15 +392,25 @@ public void run() { // this does not increment totalReceived out.write(initialSocketData); } - if (_log.shouldLog(Log.DEBUG)) + if (_log.shouldLog(Log.DEBUG)) { _log.debug("Initial data " + (initialI2PData != null ? initialI2PData.length : 0) + " written to I2P, " + (initialSocketData != null ? initialSocketData.length : 0) + " written to the socket, starting forwarders"); - if (!(s instanceof InternalSocket)) - in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE); - toI2P = new StreamForwarder(in, i2pout, true, null); + + } + if (_keepAliveSocket) { + // standard GET or HEAD, no data, do not thread a forwarder + // because we don't need it and + // we don't want it to swallow the next request + } else { + in = getSocketIn(); + // InternalSocket already has buffering + if (!(s instanceof InternalSocket)) + in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE); + toI2P = new StreamForwarder(in, i2pout, true, null); + toI2P.start(); + } fromI2P = new StreamForwarder(i2pin, out, false, _onSuccess); - toI2P.start(); // We are already a thread, so run the second one inline //fromI2P.start(); fromI2P.run(); @@ -330,7 +433,7 @@ public void run() { // HTTPClient never sets initialSocketData. if (_onFail != null) { Exception e = fromI2P.getFailure(); - if (e == null) + if (e == null && toI2P != null) e = toI2P.getFailure(); _onFail.onFail(e); } else { @@ -339,7 +442,7 @@ public void run() { } else { // Detect a reset on one side, and propagate to the other Exception e1 = fromI2P.getFailure(); - Exception e2 = toI2P.getFailure(); + Exception e2 = toI2P != null ? toI2P.getFailure() : null; Throwable c1 = e1 != null ? e1.getCause() : null; Throwable c2 = e2 != null ? e2.getCause() : null; if (c1 != null && c1 instanceof I2PSocketException) { @@ -365,11 +468,17 @@ public void run() { } catch (InterruptedException ex) { if (_log.shouldLog(Log.ERROR)) _log.error("Interrupted", ex); + _keepAliveI2P = false; + _keepAliveSocket = false; } catch (SSLException she) { _log.error("SSL error", she); + _keepAliveI2P = false; + _keepAliveSocket = false; } catch (IOException ex) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Error forwarding", ex); + _keepAliveI2P = false; + _keepAliveSocket = false; } catch (IllegalStateException ise) { // JamVM (Gentoo: jamvm-1.5.4, gnu-classpath-0.98+gmp) //java.nio.channels.NotYetConnectedException @@ -384,9 +493,13 @@ public void run() { // at net.i2p.i2ptunnel.I2PTunnelRunner.run(I2PTunnelRunner.java:167) if (_log.shouldLog(Log.WARN)) _log.warn("gnu?", ise); + _keepAliveI2P = false; + _keepAliveSocket = false; } catch (RuntimeException e) { if (_log.shouldLog(Log.ERROR)) _log.error("Internal error", e); + _keepAliveI2P = false; + _keepAliveSocket = false; } finally { removeRef(); if (i2pReset) { @@ -401,6 +514,8 @@ public void run() { try { i2ps.close(); } catch (IOException ioe) {} + _keepAliveI2P = false; + _keepAliveSocket = false; } else if (sockReset) { if (_log.shouldWarn()) _log.warn("Got socket reset, resetting I2P socket"); @@ -410,6 +525,8 @@ public void run() { try { s.close(); } catch (IOException ioe) {} + _keepAliveI2P = false; + _keepAliveSocket = false; } else { // now one connection is dead - kill the other as well, after making sure we flush try { @@ -418,14 +535,18 @@ public void run() { } } } - + /** + * Warning - overridden in I2PTunnelHTTPClientRunner. + * Here we ignore keepalive and always close both sides. + * The HTTP flavor handles keepalive. + * * @param out may be null * @param in may be null * @param i2pout may be null * @param i2pin may be null * @param t1 may be null - * @param t2 may be null + * @param t2 may be null, ignored, we only join t1 */ protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin, Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException { @@ -451,20 +572,20 @@ protected void close(OutputStream out, InputStream in, OutputStream i2pout, Inpu } catch (IOException ioe) {} if (t1 != null) t1.join(30*1000); - // t2 = fromI2P now run inline - //t2.join(30*1000); } /** * Deprecated, unimplemented in streaming, never called. + * @deprecated unused */ + @Deprecated public void errorOccurred() { synchronized (finishLock) { finished = true; finishLock.notifyAll(); } } - + private void removeRef() { if (sockList != null) { synchronized (slock) { @@ -472,7 +593,7 @@ private void removeRef() { } } } - + /** * Forward data in one direction */ @@ -485,6 +606,8 @@ private class StreamForwarder extends I2PAppThread { private final ByteCache _cache; private final SuccessCallback _callback; private volatile Exception _failure; + // does not need to be volatile, will be set from same thread + public boolean done; /** * Does not start itself. Caller must start() @@ -497,7 +620,8 @@ public StreamForwarder(InputStream in, OutputStream out, boolean toI2P, SuccessC _callback = cb; direction = (toI2P ? "toI2P" : "fromI2P"); _cache = ByteCache.getInstance(32, NETWORK_BUFFER_SIZE); - setName("StreamForwarder " + _runnerId + '.' + direction); + if (toI2P) + setName("StreamForwarder " + _runnerId + '.' + direction); } @Override @@ -510,15 +634,11 @@ public void run() { + from + " and " + to); } - // boo, hiss! shouldn't need this - the streaming lib should be configurable, but - // somehow the inactivity timer is sometimes failing to get triggered properly - //i2ps.setReadTimeout(2*60*1000); - ByteArray ba = _cache.acquire(); - byte[] buffer = ba.getData(); // new byte[NETWORK_BUFFER_SIZE]; + byte[] buffer = ba.getData(); try { int len; - while ((len = in.read(buffer)) != -1) { + while (!done && (len = in.read(buffer)) != -1) { if (len > 0) { out.write(buffer, 0, len); if (_toI2P) { @@ -583,15 +703,28 @@ public void run() { _failure = ex; } finally { _cache.release(ba); + boolean keepAliveFrom, keepAliveTo; + if (_toI2P) { + keepAliveFrom = _keepAliveSocket; + keepAliveTo = _keepAliveI2P; + } else { + keepAliveFrom = _keepAliveI2P; + keepAliveTo = _keepAliveSocket; + } if (_log.shouldLog(Log.INFO)) { - _log.info(direction + ": done forwarding between " - + from + " and " + to); + _log.info(direction + ": done forwarding from " + + from + " to " + to + + " keepalive from? " + keepAliveFrom + + " keepalive to? " + keepAliveTo + + " bytes: " + (_toI2P ? totalSent : totalReceived)); } - try { - in.close(); - } catch (IOException ex) { - if (_log.shouldLog(Log.WARN)) - _log.warn(direction + ": Error closing input stream", ex); + if (!keepAliveFrom) { + try { + in.close(); + } catch (IOException ex) { + if (_log.shouldLog(Log.WARN)) + _log.warn(direction + ": Error closing input stream", ex); + } } try { // Thread must close() before exiting for a PipedOutputStream, @@ -601,10 +734,17 @@ public void run() { // DON'T close if we have a timeout job and we haven't received anything, // or else the timeout job can't write the error message to the stream. // close() above will close it after the timeout job is run. - if (!((onTimeout != null || _onFail != null) && (!_toI2P) && totalReceived <= 0)) - out.close(); - else if (_log.shouldLog(Log.INFO)) - _log.info(direction + ": not closing so we can write the error message"); + if (!((onTimeout != null || _onFail != null) && (!_toI2P) && totalReceived <= 0)) { + if (keepAliveTo) + out.flush(); + else + out.close(); + } else { + if (_log.shouldInfo()) + _log.info(direction + ": not closing so we can write the error message"); + if (keepAliveTo) + out.flush(); + } } catch (IOException ioe) { if (_log.shouldLog(Log.DEBUG)) _log.debug(direction + ": Error flushing to close", ioe); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelTask.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelTask.java index b59567ff89..fdff730230 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelTask.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelTask.java @@ -3,6 +3,8 @@ */ package net.i2p.i2ptunnel; +import java.util.Properties; + import net.i2p.client.I2PSession; import net.i2p.util.EventDispatcher; import net.i2p.util.EventDispatcherImpl; @@ -111,6 +113,17 @@ public void disconnected(I2PSession session) { getTunnel().removeSession(session); } + /** + * @since 0.9.62 + */ + protected boolean getBooleanOption(String opt, boolean dflt) { + Properties opts = getTunnel().getClientOptions(); + String o = opts.getProperty(opt); + if (o != null) + return Boolean.parseBoolean(o); + return dflt; + } + /** * Does nothing here. Extending classes may override. */