Skip to content

Request Response Framework

jpbetz edited this page Dec 4, 2012 · 12 revisions

R2 is the request / response API in Pegasus. It includes abstractions for REST and RPC requests and responses, filter chains for customized processing, and transport abstraction. It is divided into two parts: the pegasus R2 library, which has no dependencies on LinkedIn code and is designed to be open-source ready, and the the network R2 library that integrates R2 with the standard LinkedIn stack.

R2 is intended to be used in conjunction with "D2", our Dynamic Discovery and Software Load Balancing system. (The combined stack can be referred to as R2D2 :).

h2. Layers

The following diagram shows the layers involved in the R2 system. Each layer will be described in detail below.

!https://github.com/linkedin/pegasus/wiki/r2.png!

h2. Requests and Responses

In this section, we describe messages in the R2 system. There are two basic types of messages in R2: RPC and REST. They share a common hierarchy, which makes it possible to work with broader message abstractions when appropriate. All three (Base Message, RPC, and REST) will be described in further detail in sections below.

Messages have a few properties that are worth describing here:

  • They are immutable. It is not possible to change a message after it has been created. It is, however, possible to copy a message and make changes using builders, which will be described later.
  • They are thread-safe due to immutability.
  • New messages are created using builders.
  • Existing messages can be copied and modified using builders.

h3. Base Messages

h4. Message

"Message" is the root of the message hierarchy. All messages in R2 contain an entity (which may be empty). For RPC, an entity contains the request or the response data. For REST, the R2 entity is equivalent to a REST entity.

Message has the following interface:

public interface Message
{
  /**
   * Returns the entity for this message.
   *
   * @return the entity for this message
   */
  ByteString getEntity();

  /**
   * Returns a {@link MessageBuilder}, which provides a means of constructing a new message using
   * this message as a starting point. Changes made with the builder are not reflected by this
   * message instance.
   *
   * @return a builder for this message
   */
  MessageBuilder extends MessageBuilder>> builder();
}

In addition to an entity, all messages provide a builder that can be used to copy the message and modify its copy. In the case of the Message above, the builder is a MessageBuilder.

Messages are subdivided into Requests and Responses. The interfaces for these are described below.

h4. Request

A request has a URI. This provides information to the client about how to direct the request - for example, which potocol to use, which server to connect to, what service to invoke, etc. R2 should (almost) always be used with D2 (Dynamic Discovery), so typically URNs will be used for the URI. These URNs will be resolved internally by the Dynamic Discovery system.

public interface Request extends Message
{
  /**
   * Returns the URI for this request.
   *
   * @return the URI for this request
   */
  URI getURI();

  /**
   * Returns a {@link RequestBuilder}, which provides a means of constructing a new request using
   * this request as a starting point. Changes made with the builder are not reflected by this
   * request instance. The concrete type (for example {@link com.linkedin.r2.message.rpc.RpcRequest})
   * is preserved when building the new request.
   *
   * @return a builder for this request
   */
  RequestBuilder extends RequestBuilder>> requestBuilder();
}

h4. Response

Responses provide no additional information beyond that supplied by Message.

public interface Response extends Message
{
  /**
   * Returns a {@link ResponseBuilder}, which provides a means of constructing a new response using
   * this response as a starting point. Changes made with the builder are not reflected by this
   * response instance. The concrete type (for example {@link com.linkedin.r2.message.rpc.RpcResponse})
   * is preserved when building the new response.
   *
   * @return a builder for this response
   */
  ResponseBuilder extends ResponseBuilder>> responseBuilder();
}

h3. RPC Messages

Currently RPC messages have no special properties beyond those already present in Message, Request, and Response. For reference, the RPC interfaces are show below.

h4. RpcMessage

public interface RpcMessage extends Message
{
  /**
   * Returns an {@link RpcMessageBuilder}, which provides a means of constructing a new message using
   * this message as a starting point. Changes made with the builder are not reflected by this
   * message instance.
   *
   * @return a builder for this message
   */
  RpcMessageBuilder extends RpcMessageBuilder>> rpcBuilder();
}

h4. RpcRequest

public interface RpcRequest extends RpcMessage, Request
{
  /**
   * Returns a {@link RpcRequestBuilder}, which provides a means of constructing a new request using
   * this request as a starting point. Changes made with the builder are not reflected by this
   * request instance.
   *
   * @return a builder for this request
   */
  RpcRequestBuilder builder();
}

h4. RpcResponse

public interface RpcResponse extends RpcMessage, Response
{
  /**
   * Returns a {@link RpcResponseBuilder}, which provides a means of constructing a new response using
   * this response as a starting point. Changes made with the builder are not reflected by this
   * response instance.
   *
   * @return a builder for this response
   */
  RpcResponseBuilder builder();
}

h3. REST Messages

h4. RestMessage

All REST messages add headers in addition to the base message properties. Headers must conform to the definition in "RFC 2616":http://www.ietf.org/rfc/rfc2616.txt (described in section 4.2 and associated sections). The RestMessage interface is:

public interface RestMessage extends Message
{
  /**
   * Gets the value of the header with the given name. If there is no header with the given name
   * then this method returns {@code null}.
   *
   * @param name name of the header
   * @return the value of the header or {@code null} if there is no header with the given name.
   */
  String getHeader(String name);

  /**
   * Returns an unmodifiable view of the headers in this builder. Because this is a view of the
   * headers and not a copy, changes to the headers in this builder *may* be reflected in the
   * returned map.
   *
   * @return a view of the headers in this builder
   */
  Map getHeaders();

  /**
   * Returns a {@link RestMessageBuilder}, which provides a means of constructing a new message using
   * this message as a starting point. Changes made with the builder are not reflected by this
   * message instance.
   *
   * @return a builder for this message
   */
  RestMessageBuilder extends RestMessageBuilder>> restBuilder();
}

h4. RestRequest

RestRequests add a method property, which matches the semantics for a REST message (i.e. the method is one of GET, PUT, POST, DELETE):

public interface RestRequest extends RestMessage, Request
{
  /**
   * Returns the REST method for this request.
   *
   * @return the REST method for this request
   * @see com.linkedin.r2.message.rest.RestMethod
   */
  String getMethod();

  /**
   * Returns a {@link RestRequestBuilder}, which provides a means of constructing a new request using
   * this request as a starting point. Changes made with the builder are not reflected by this
   * request instance.
   *
   * @return a builder for this request
   */
  @Override
  RestRequestBuilder builder();
}

h4. RestResponse

RestResponses add a status property, which matches the semantics of a REST status code (e.g. 200 - OK, see RFC 2616 for details about HTTP status codes):

public interface RestResponse extends RestMessage, Response
{
  /**
   * Returns the status for this response.
   *
   * @return the status for this response
   * @see com.linkedin.r2.message.rest.RestStatus
   */
  int getStatus();

  /**
   * Returns a {@link RestResponseBuilder}, which provides a means of constructing a new response using
   * this response as a starting point. Changes made with the builder are not reflected by this
   * response instance.
   *
   * @return a builder for this response
   */
  @Override
  RestResponseBuilder builder();
}

h3. ByteStrings

Entities are stored as ByteStrings in R2. ByteStrings provide a mechanism to ensure that the byte data is immutable and not copied unless absolutely necessary. The ByteString interface looks like the following:

public final class ByteString
{
  /**
   * Returns an empty {@link ByteString}.
   *
   * @return an empty {@link ByteString}
   */
  public static ByteString empty();

  /**
   * Returns a new {@link ByteString} that wraps a copy of the supplied bytes. Changes to the supplied bytes
   * will not be reflected in the returned {@link ByteString}.
   *
   * @param bytes the bytes to copy
   * @return a {@link ByteString} that wraps a copy of the supplied bytes
   * @throws NullPointerException if {@code bytes} is {@code null}.
   */
  public static ByteString copy(byte[] bytes);

  /**
   * Returns a new {@link ByteString} that wraps the bytes generated from the supplied string with the
   * given charset.
   *
   * @param str the string to copy
   * @param charset the charset used to encode the bytes
   * @return a {@link ByteString} that wraps a copy of the supplied bytes
   */
  public static ByteString copyString(String str, Charset charset);

  /**
   * Returns a new {@link ByteString} with bytes read from an {@link InputStream}.
   *
   * If size is zero, then this method will always return the {@link ByteString#empty()},
   * and no bytes will be read from the {@link InputStream}.
   * If size is less than zero, then {@link NegativeArraySizeException} will be thrown
   * when this method attempt to create an array of negative size.
   *
   * @param inputStream that will provide the bytes.
   * @param size provides the number of bytes to read.
   * @return a ByteString that contains the read bytes.
   * @throws IOException from InputStream if requested number of bytes
   *                     cannot be read.
   */
  public static ByteString read(InputStream inputStream, int size) throws IOException;

  /**
   * Returns a copy of the bytes in this {@link ByteString}. Changes to the returned byte[] will not be
   * reflected in this {@link ByteString}.

* * Where possible prefer other methods for accessing the underlying bytes, such as * {@link #asByteBuffer()}, {@link #write(java.io.OutputStream)}, or {@link #asString(Charset)}. * The first two make no copy of the byte array, while the last minimizes the amount of copying * (constructing a String from a byte[] always involves copying). * * @return a copy of the bytes in this {@link ByteString} */ public byte[] copyBytes(); /** * Copy the bytes in this {@link ByteString} to the provided byte[] starting at the specified offset. * * Where possible prefer other methods for accessing the underlying bytes, such as * {@link #asByteBuffer()}, {@link #write(java.io.OutputStream)}, or {@link #asString(Charset)}. * The first two make no copy of the byte array, while the last minimizes the amount of copying * (constructing a String from a byte[] always involves copying). * * @param dest is the destination to copy the bytes in this {@link ByteString} to. * @param offset is the starting offset in the destination to receive the copy. */ public void copyBytes(byte[] dest, int offset); /** * Returns a read only {@link ByteBuffer} view of this {@link ByteString}. This method makes no copy. * * @return read only {@link ByteBuffer} view of this {@link ByteString}. */ public ByteBuffer asByteBuffer(); /** * Return a String representation of the bytes in this {@link ByteString}, decoded using the supplied * charset. * * @param charset the charset to use to decode the bytes * @return the String representation of this {@link ByteString} */ public String asString(Charset charset); /** * Return an {@link InputStream} view of the bytes in this {@link ByteString}. * * @return an {@link InputStream} view of the bytes in this {@link ByteString} */ public InputStream asInputStream(); /** * Writes this {@link ByteString} to a stream without copying the underlying byte[]. * * @param out the stream to write the bytes to * * @throws IOException if an error occurs while writing to the stream */ public void write(OutputStream out) throws IOException;

h3. Builders

As mentioned previously, builders provide the following basic functionality:

  • Create a new message
  • Copy a message, modify it, and create a new immutable copy

To create a new message, use RpcRequestBuilder / RpcResponseBuilder or RestRequestBuilder / RestResponseBuilder as appropriate.

Builder methods are designed to be chained. Here is an example of chaining:

final RestResponse res = new RestResponseBuilder()
        .setEntity(new byte[] {1,2,3,4})
        .setHeader("k1", "v1")
        .setStatus(300)
        .build()

To copy a message, it is sufficient to ask the message for its builder. Typically this can be done with the builder method, but in some cases a special builder method must be used (when working with abstract messages).

Here is an example of copying and modifying a message:

final RpcRequest req = ...;
final RpcRequest newReq = req.builder()
                             .setEntity(new byte[] {5,6,7,8})
                             .setURI(URI.create("anotherURI"))
                             .build();

Here is an example of copying and modifying an abstract request:

final Request req = ...;
final Request newReq = req.requestBuilder()
                          .setEntity(new byte[] {5,6,7,8})
                          .setURI(URI.create("anotherURI"))
                          .build();

h3. Callbacks

R2 is, by design, asynchronous in nature. As will be shown below, R2 provides two mechanisms to wait for an asynchronous operation to complete: callbacks and Futures. Futures should be familiar to most Java developers, so we will not discuss them further in this document. Callbacks are less common in Java and warrant some quick discussion.

In R2, the Callback interface looks like:

public interface Callback
{
  /**
   * Called if the asynchronous operation completed with a successful result.
   *
   * @param t the result of the asynchronous operation
   */
  void onSuccess(T t);

  /**
   * Called if the asynchronous operation failed with an error.
   *
   * @param e the error
   */
  void onError(Exception e);
}

In some cases it is only possible to invoke an asynchronous operation with a callback (and not a Future). In those cases, which are not common for external users, it is possible to use a FutureCallback as shown in this example:

final FutureCallback future = new FutureCallback();
asyncOp(..., future);
return future.get();

In some cases, code does not need to wait for completion of an event. In the case of the future, simply do not call get(). In the case of callbacks, use Callbacks.empty(), as shown in this example:

asyncOp(..., Callbacks.empty());

Keep in mind that it will not be possible to know when the operation completed - or even if it completed successfully.

Sometimes code will want to know when an operation has completed, but is not concerned with the result. In this case, a SimpleCallback can be used or adapted to a Callback with Callbacks.adaptSimple(...).

h2. Client API

The R2 client API provides the mechanism for sending request and responses to a remote service or resource handler. The diagram below shows where the client sits in the R2 stack.

!https://github.com/linkedin/pegasus/wiki/r2client.png!

The main interface in this layer is the Client interface, shown here:

public interface Client
{
  /**
   * Asynchronously issues the given request and returns a {@link Future} that can be used to wait
   * for the response.
   *
   * @param request the request to issue
   * @return a future to wait for the response
   */
  Future restRequest(RestRequest request);

  /**
   * Asynchronously issues the given request. The given callback is invoked when the response is
   * received. This event driven approach is typically more complicated to use and is appropriate
   * for building other abstractions, such as a DAG based resolver.
   *
   * @param request the request to issue
   * @param callback the callback to invoke with the response
   */
  void restRequest(RestRequest request, Callback callback);

  /**
   * Asynchronously issues the given request and returns a {@link Future} that can be used to wait
   * for the response.
   *
   * @param request the request to issue
   * @return a future to wait for the response
   */
  Future rpcRequest(RpcRequest request);

  /**
   * Asynchronously issues the given request. The given callback is invoked when the response is
   * received. This event driven approach is typically more complicated to use and is appropriate
   * for building other abstractions, such as a DAG based resolver.
   *
   * @param request the request to issue
   * @param callback the callback to invoke with the response
   */
  void rpcRequest(RpcRequest request, Callback callback);

  /**
   * Starts asynchronous shutdown of the client. This method should block minimally, if at all.
   *
   * @param callback a callback to invoke when the shutdown is complete
   */
  void shutdown(Callback callback);
}

There are separate methods for making REST and RPC calls. Requests are made asynchronously using either Futures or Callbacks (see Callback section for details).

h2. Request Handler API

The Request Handler API is the server-side counterpart to the client, as shown in this diagram:

!https://github.com/linkedin/pegasus/wiki/r2server.png!

Request Handlers are used for two purposes:

  • Determining the rules for dispatching a request to a service / resource manager or another dispatcher
  • Handling a request (as a service or a resource manager)

We have two Request Handler interfaces, one for each request type (RPC or REST).

The RPC Request Handler interface looks like:

public interface RpcRequestHandler
{
  void handleRequest(RpcRequest request, Callback callback);
}

The REST Request Handler interface looks like:

public interface RestRequestHandler
{
  void handleRequest(RestRequest request, Callback callback);
}

h2. Filter Chains

The filter chain provides a mechanism for doing special processing for each request and response in the system. For example, logging, statistics collections, etc., are appropriate for this layer.

The following diagrams shows where filter chains fit in the R2 layering:

!https://github.com/linkedin/pegasus/wiki/r2fc.png!

The FilterChain provides methods for adding new filters and for processing requests, responses, and errors.

Requests pass through the filter chain starting from the beginning and move towards the end. Responses and errors start from the end of the filter chain and move towards the beginning.

If an error occurs while a request moves through the filter chain (either due to a thrown Exception or due to an onError(...) call), then the error is first sent to the filter that raised the error and then it moves back towards the beginning of the filter chain. Any filters that show up after the filter that threw the exception will not get a chance to process the request.

h3. Filters

R2 provides a set of interfaces that can be implemented to intercept different types of messages. They are:

  • Message filters ** MessageFilter - intercepts both requests and responses ** RequestFilter ** ResponseFilter
  • RPC filters ** RpcFilter - intercepts both RPC requests and RPC responses ** RpcRequestFilter ** RpcResponseFilter
  • REST filters ** RestFilter - intercepts both REST requests and REST responses ** RestRequestFilter ** RestResponseFilter

Messages filters can be used to handle both REST and RPC messages in an abstract way (as Requests and Responses). RPC filters and REST filters can be used to handle messages of the specific type (RPC or REST). RPC and REST filters can be implemented by a single class. However, RPC and REST filters will override the hooks provided by Message filters, so they should not be used together.

h3. Wire Attributes

Wire attributes provide a mechanism to send "side-band" data to a remote endpoint along with a request or response. They are exposed at the filter chain layer and can be queried or modified by filters. They are not made available at the request / response layer because the entity (and headers, for REST) should supply all of the data necessary to process a request or response.

Wire attributes are sent as headers with the R2 HTTP transport and as attributes with the C3PO transport. All headers are prefixed with the key: "X-LI-R2-W-".

Common headers include:

  • X-LI-R2-W-MsgType ** Informs the server of the type of message to expect ** Value is one of ("RPC", "REST")
  • X-LI-R2-W-IC-1-com.linkedin.container.rpc.trace.rpcTrace ** Sends RPC trace information to the server ** RPC trace is used to semi-uniquely identify a request and the hops that the request takes

h3. Local Attributes

Local attributes are used by filters during response processing to get data stored during the request. Filters use this mechanism because responses are not guaranteed to be processed on the same thread as their requests.

h2. Transports

The transport bridges convert our abstract requests, response, and wire attributes into transport-specific requests. We have support for an asynchronous HTTP transport and for the C3PO (formerly PRPC) transport at this layer; however, out intention is to use only the HTTP transport.

!https://github.com/linkedin/pegasus/wiki/r2lltran.png!

h3. HTTP Transport

In the HTTP transport there is a standard transformation of our REST messages to equivalent HTTP messages.

For RPC, the HTTP transport uses the POST method for all requests. Non-2xx responses are mapped to RPC errors.

Wire attributes are transported as headers, using the attribute name, which starts with X-LI-R2-W-. See the wire attribute section for more details.

h1. Network R2

The Network R2 includes code and Spring wiring to integrate R2 with the LinkedIn stack. The source code is available in the r2 module in the network repository.

h2. LinkedIn Filters

R2 includes the following standard filters for both REST and RPC requests:

  • Client Filters ** TransactionValidationFilter *** Handles cases where a transaction is open while making a remote call *** Can be configured to ignore the error, log the error, or throw an Exception ** ICValidationFilter *** Checks that the IC has been initialized before making a remote call *** Can be configured to ignore the error, log the error, or thrown an Exception ** TraceValidationFilter *** Checks the the RpcTrace has been set before making a remote call *** Can be configured to ignore the error, log the error, or thrown an Exception ** ClientMessageTraceFilter *** Updates the RPC trace information before a remote invocation ** ClientICFilter *** Sends IC information through wire attributes *** On a response from the server, performs a merge of 2-way attributes
  • Server Filters ** ServerICFilter *** Loads IC information from wire attributes *** Updated 2-way attributes are sent with the response ** ServerMessageTraceFilter *** Sets RPC trace information in the MDC (context map used for logging) ** ServerPageKeyFilter *** Sets page key information in the MDC (context map used for logging) ** PublicAccessLogFilter *** Logs requests, their latency, and their results
  • Common Filters ** LoggingFilter *** At debug level this filter logs trace messages for request, response, and error events.

These filters are included in the standard spring components, which are described below.

h2. Spring Components

There are two components that can be consumed in R2: the client factory component and the server dispatcher component.

h3. Client Factory Component

The client-factory-cmpt exposes a TransportClientFactory that can be used to create new Clients (Clients are described earlier in this document). By default the Client uses the HTTP protocol.

There is no call tracking in the client-factory-cmpt because this component does not know what services are being called. D2 provides a dynamic call tracker in its DynamicClient.

It is possible to add additional filters into the client factory's filter chain by wiring in a "preFilters" or "postFilters" list bean. "preFilters" are added to the beginning of the filter chain, while "postFilters" are added to the end of the filter chain.

h3. Server Component

The server-cmpt takes a dispatcher and produces a servlet that con be used with Servlet container. Its primary responsibility, beyond providing the servlet, is to provide all of the integration hooks into the LinkedIn stack. Much of the integration is described above in the LinkedIn Filters section.

It is possible to add additional filters into the server dispatchers filter chain by wiring in a "preFilters" or "postFilters" list bean. "preFilters" are added to the beginning of the filter chain, while "postFilters" are added to the end of the filter chain.

Clone this wiki locally