-
Notifications
You must be signed in to change notification settings - Fork 547
EntityStream
Messages used to be fully buffered. To support streaming, we would like a model where the body of the request or response could be processed one chunk at a time. To some extent, fully buffering the body is a mismatch between R2 API and the APIs of the underlying frameworks/libraries, neither of which requires the body to be fully buffered in memory (they do provide facilities, e.g. HttpObjectAggregator in Netty, to do so if users so desired).
Hence, the main design task is to find a good representation / abstraction of the desired model and update R2 APIs to use that model instead.
There are a few design goals that we would like to achieve.
- The API should be asynchronous. This allows users to freely use this API in asynchronous environment without worrying about blocking the threads.
- The API should not depend on / expose specific features of the underlying frameworks / libraries.
- Back pressure should be supported. That is, slower consumer (server when receiving request and client when receiving response) should be able to apply back pressure to faster producer.
The abstraction we come up with to represent the body of a request or response is EntityStream. It is influenced by Reactive Streams and Servlet API 3.1.
The EntityStream has a Writer on one end and has a Reader on the other hand. It could also have Observers which observes the data flow.
The stream could only be consumed once. Writer is not obliged to keep reference to the data once it's been consumed.
Reader is the one who ultimately decides whether data chunks should be consumed. It signals its intention of consuming more data by using ReadHandle's request() method. Writer, using WriteHandle, can write data chunks to EntityStream as requested. This process is asynchronous (i.e. Writer does not need to write on spot when Reader requested). If Writer has no data at the time, it can write later once the data is produced. When data is written into EntityStream, Reader's callback would be invoked to process the data chunk.
The contract is that Writer cannot write more data chunks than requested by Reader. If the data stream ends, Writer signals successful end by WriteHandle's done() method; otherwise if there is a problem in producing the data, Writer signals error. Observers just observes the data flow (their callbacks would be invoked when new data is written into EntityStream, or done or error events happened), without the means to request more data.
EntityStream API is pull-based, so that ensures the ability to apply back pressure inside a JVM (on the contrary, without this back pressure, for example, Netty would willingly buffer all data chunks you write to it, until OutOfMemoryError is thrown). Across the network, the back pressure is achieved via TCP flow control.
Suppose that there are two EntityStreams (W, O, R, represents Writer, Observer, Reader, respectively) in the figure, one on client side, where the user code is the Writer which is trying to write the data chunks of the request body; the other is on the server side, where the user code is the Reader which is trying to read data chunks of the request body.
Now assume disk IO is busy on the server and write to disk takes a long time. The Reader would not request more data because previous data chunks are yet to be processed. Consequently, Writer would not try to write data into the EntityStream and thus ultimately stops reading data from the socket(client side). Now TCP flow control kicks in and as a result the socket buffer on the client side fills up. Because the it is futile to try writing into the socket, the Reader on client side EntityStream would stop requesting more data. Finally the Writer would stop writing data into the EntityStream.
Quick Access:
- Tutorials
- Dynamic Discovery
- Request Response API
-
Rest.li User Guide
- Rest.li Server
- Resources
- Resource Types
- Resource Methods
- Rest.li Client
- Projections
- Filters
- Wire Protocol
- Data Schemas
- Data
- Unstructured Data
- Tools
- FAQ