diff --git a/src/Grpc/src/AbstractCall.php b/src/Grpc/src/AbstractCall.php new file mode 100644 index 0000000000..1062536d32 --- /dev/null +++ b/src/Grpc/src/AbstractCall.php @@ -0,0 +1,215 @@ +curl_handler = curl_init(); + $headers[] = "Content-Type: application/grpc"; + $headers[] = "TE: trailers"; + $options = array( + CURLOPT_URL => $method, + CURLOPT_RETURNTRANSFER => true, + CURLOPT_HTTPHEADER => $headers, + CURLOPT_POST => true, + CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE, + CURLOPT_SSL_VERIFYHOST => 0, + CURLOPT_SSL_VERIFYPEER => 0, + CURLINFO_HEADER_OUT => true, + CURLOPT_HEADERFUNCTION => array($this, 'header_callback'), + CURLOPT_VERBOSE => 1, + ); + curl_setopt_array($this->curl_handler, $options); + $this->channel = $channel; + $this->multi_handler = $channel->getMh(); + $this->deserialize = $deserialize; + $this->metadata = null; + $this->trailing_metadata = null; + $this->channel->register_ch($this->curl_handler); + } + + public function header_callback($ch, $header_line) + { + $first_colume_idx = strpos($header_line, ':'); + if ($first_colume_idx === false) { + $key = $header_line; + $value = ""; + } else { + $key = substr($header_line, 0, $first_colume_idx); + $value = substr($header_line, $first_colume_idx + 1); + } + // Header received after receiving data frame is counted as trailer + if($this->call_status == "receive_response_begin"){ + // TODO create a struct to save trailer + // TODO: update trailer process after new libcurl is released + // Currently, libcurl has bug which all trailers after 2nd are read incorrectly. + // https://github.com/curl/curl/commit/fa3dbb9a147488a2943bda809c66fc497efe06cb + // Only precess the first trailer "grpc-status" for now. + if($key == "grpc-status") { + echo "[AbstractCall.php] Receive Treailer: $header_line" . PHP_EOL; + } + } else { + // TODO create a struct to save header + echo "[AbstractCall.php] Receive Header: $header_line".PHP_EOL; + } + return strlen($header_line); + } + + /** + * @param mixed $argument The data to decode + * @return string Base64 decoded massage + */ + public function _encode($argument) { + $serialized_data = $argument->serializeToString(); + $len = strlen($serialized_data); + $bytesArray = [0, 0, 0, 0, 0]; + for ($i = 4; $i >= 1; $i--) { + $bytesArray[$i] = ($len % 256); + $len = $len >> 8; + } + $serialized_data_byte = unpack('C*', $serialized_data); + $argument_data_byte = array_merge($bytesArray, $serialized_data_byte); + $argument_data_string = implode(array_map("chr", $argument_data_byte)); + return $argument_data_string; + } + + /** + * @return mixed The metadata sent by the server + */ + public function getMetadata() + { + } + + /** + * @return mixed The trailing metadata sent by the server + */ + public function getTrailingMetadata() + { + } + + /** + * @return string The URI of the endpoint + */ + public function getPeer() + { + } + + /** + * Cancels the call. + */ + public function cancel() + { + } + + /** + * Serialize a message to the protobuf binary format. + * + * @param mixed $data The Protobuf message + * + * @return string The protobuf binary format + */ + protected function _serializeMessage($data) + { + // Proto3 implementation + if (method_exists($data, 'encode')) { + return $data->encode(); + } elseif (method_exists($data, 'serializeToString')) { + return $data->serializeToString(); + } + + // Protobuf-PHP implementation + return $data->serialize(); + } + + /** + * Deserialize a response value to an object. + * + * @param string $value The binary value to deserialize + * + * @return mixed The deserialized value + */ + protected function _deserializeResponse($value) + { + if ($value === null) { + return; + } + // Proto3 implementation + if (is_array($this->deserialize)) { + list($className, $deserializeFunc) = $this->deserialize; + $obj = new $className(); + if (method_exists($obj, $deserializeFunc)) { + $obj->$deserializeFunc($value); + } else { + $obj->mergeFromString($value); + } + + return $obj; + } + + // Protobuf-PHP implementation + return call_user_func($this->deserialize, $value); + } + + /** + * Set the CallCredentials for the underlying Call. + * + * @param CallCredentials $call_credentials The CallCredentials object + */ + public function setCallCredentials($call_credentials) + { + } +} diff --git a/src/Grpc/src/BaseStub.php b/src/Grpc/src/BaseStub.php new file mode 100644 index 0000000000..ffb32c2589 --- /dev/null +++ b/src/Grpc/src/BaseStub.php @@ -0,0 +1,191 @@ +exec) + // libcurl_version >= 7.43.0 to support multiplexing + // libcurl_version >= 7.47.0 to support trailer + // libcurl_version >= 7.49.0 to support http2_prior_knowledge + + $this->hostname = $hostname; + $this->update_metadata = null; + if (isset($opts['update_metadata'])) { + if (is_callable($opts['update_metadata'])) { + $this->update_metadata = $opts['update_metadata']; + } + unset($opts['update_metadata']); + } + // $package_config = json_decode( + // file_get_contents(dirname(__FILE__).'/../../composer.json'), true); + if (!empty($opts['grpc.primary_user_agent'])) { + $opts['grpc.primary_user_agent'] .= ' '; + } else { + $opts['grpc.primary_user_agent'] = ''; + } + if (!empty($opts['grpc.ssl_target_name_override'])) { + $this->hostname_override = $opts['grpc.ssl_target_name_override']; + } + //$opts['grpc.primary_user_agent'] .= + // 'grpc-php/'.$package_config['version']; + if ($channel) { + if (!is_a($channel, 'resource')) { + throw new \Exception('The channel argument is not a'. + 'cURL_multi_resource object'); + } + $this->channel = $channel; + } else { + $this->channel = new \Grpc\Channel($hostname, $opts); + } + } + + /** + * Close the communication channel associated with this stub. + */ + public function close() + { + curl_multi_close($this->channel); + } + + /** + * constructs the auth uri for the jwt. + * + * @param string $method The method string + * + * @return string The URL string + */ + private function _get_jwt_aud_uri($method) + { + if ($this->hostname_override) { + $hostname = $this->hostname_override; + } else { + $hostname = $this->hostname; + } + $prefix = 'http://'; + return $prefix.$hostname.$method; + } + + /** + * validate and normalize the metadata array. + * + * @param array $metadata The metadata map + * + * @return array $metadata Validated and key-normalized metadata map + * @throw InvalidArgumentException if key contains invalid characters + */ + private function _validate_and_normalize_metadata($metadata) + { + $metadata_copy = []; + foreach ($metadata as $key => $value) { + if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) { + throw new \InvalidArgumentException( + 'Metadata keys must be nonempty strings containing only '. + 'alphanumeric characters, hyphens and underscores'); + } + $metadata_copy[strtolower($key)] = $value; + } + + return $metadata_copy; + } + + /* This class is intended to be subclassed by generated code, so + * all functions begin with "_" to avoid name collisions. */ + + /** + * Call a remote method that takes a single argument and has a + * single output. + * + * @param string $method The name of the method to call + * @param mixed $argument The argument to the method + * @param callable $deserialize A function that deserializes the response + * @param array $metadata A metadata map to send to the server + * (optional) + * @param array $options An array of options (optional) + * + * @return UnaryCall The active call object + */ + protected function _simpleRequest($method, + $argument, + $deserialize, + array $metadata = [], + array $options = []) + { + // TODO: $this->update_metadata like grpc + $call = new UnaryCall($this->channel, + $this->_get_jwt_aud_uri($method), + $deserialize, + $options); + + $call->start($argument, $metadata, $options); + + return $call; + } + + /** + * Call a remote method that takes a single argument and returns a stream + * of responses. + * + * @param string $method The name of the method to call + * @param mixed $argument The argument to the method + * @param callable $deserialize A function that deserializes the responses + * @param array $metadata A metadata map to send to the server + * (optional) + * @param array $options An array of options (optional) + * + * @return ServerStreamingCall The active call object + */ + protected function _serverStreamRequest($method, + $argument, + $deserialize, + array $metadata = [], + array $options = []) + { + $call = new ServerStreamingCall($this->channel, + $this->_get_jwt_aud_uri($method), + $deserialize, + $options); + $call->start($argument, $metadata, $options); + return $call; + } + +} \ No newline at end of file diff --git a/src/Grpc/src/Channel.php b/src/Grpc/src/Channel.php new file mode 100644 index 0000000000..89afa22c96 --- /dev/null +++ b/src/Grpc/src/Channel.php @@ -0,0 +1,84 @@ +mh; + } + + public function getCurlOpts() { + return $this->curl_opts; + } + /** + * Construct an instance of the Channel class. If the $args array contains a + * "credentials" key mapping to a ChannelCredentials object, a secure channel + * will be created with those credentials. + * + * @param string $target The hostname to associate with this channel + * @param array $args The arguments to pass to the Channel (optional) + * + * @throws \InvalidArgumentException + */ + public function __construct($target, $args = array()) { + $this->mh = curl_multi_init(); + curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, 100); + curl_multi_setopt($this->mh, CURLMOPT_MAXCONNECTS, 100); + $this->registered_ch = array(); + //TODO: transfer $args to $curl_opts + } + + public function register_ch($ch){ + array_push($this->registered_ch, $ch); + } + + // TODO: change is_ch_finished and unregister_ch to a faster way. + public function is_ch_finished($ch){ + // If a ch is finished, it is removed from the registered array. + if(in_array($ch, $this->registered_ch)){ + return false; + } + return true; + } + + public function unregister_ch($ch){ + $pos = array_search($ch, $this->registered_ch); + unset($this->registered_ch[$pos]); + } + + /* + * TODO: support grpc API + * public function getTarget() {} + * public function getConnectivityState($try_to_connect = false) {} + * public function watchConnectivityState($last_state, Timeval $deadline_obj) {} + * public function close() {} + */ +} diff --git a/src/Grpc/src/Constants.php b/src/Grpc/src/Constants.php new file mode 100644 index 0000000000..d88a5406b9 --- /dev/null +++ b/src/Grpc/src/Constants.php @@ -0,0 +1,37 @@ + + * @link https://github.com/iMega/grpc-phpdoc + */ +/** + * Grpc + * @see http://grpc.io + * @see https://github.com/grpc/grpc/tree/master/src/php/ext/grpc + */ +// TODO: add constants +namespace Grpc +{ + /** + * Register status constants + */ + + /** + * Not an error; returned on success + */ + const STATUS_OK = 0; + + /** + * The operation was cancelled (typically by the caller). + */ + const STATUS_CANCELLED = 1; + + /** + * Unknown error. An example of where this error may be returned is + * if a Status value received from another address space belongs to + * an error-space that is not known in this address space. Also + * errors raised by APIs that do not return enough error information + * may be converted to this error. + */ + const STATUS_UNKNOWN = 2; +} diff --git a/src/Grpc/src/ServerStreamingCall.php b/src/Grpc/src/ServerStreamingCall.php new file mode 100644 index 0000000000..9911bb044a --- /dev/null +++ b/src/Grpc/src/ServerStreamingCall.php @@ -0,0 +1,168 @@ +received_data = new \ArrayIterator(); + parent::__construct($channel, $method, $deserialize, $options); + } + /** + * Start the call. + * + * @param mixed $data The data to send + * @param array $metadata Metadata to send with the call, if applicable + * (optional) + * @param array $options An array of options, possible keys: + * 'flags' => a number (optional) + */ + public function start($data, array $metadata = [], array $options = []) + { + $this->request_data = $this->_encode($data); + curl_setopt($this->curl_handler, CURLOPT_READFUNCTION, function ($ch, $fh, $length = false) + { + $start_pos = $this->request_data_pos; + $this->request_data_pos += 1024; + if($this->request_data_pos > strlen($this->request_data)){ + // send last request part; + $this->call_status = "send_request_done"; + } + $request_len = min(strlen($this->request_data)-$start_pos, 1024); + return substr($this->request_data, $start_pos, $request_len); + }); + + curl_setopt($this->curl_handler, CURLOPT_WRITEFUNCTION, function(&$ch, $data) { + // echo "[ServerStreamingCall.php] chunk received: ".$data."\n"; + $this->call_status = "receive_response_begin"; + $data_len = strlen($data); + $pos = 0; + while($pos < $data_len) { + switch ($this->status){ + case "init": + // TODO: add compression bit check + $this->status = "length"; + break; + case "length": + if($this->frame_len_read_pos == 4){ + $this->frame_len = 0; + $this->frame_len_read_pos = 0; + } + $this->frame_len = ($this->frame_len << 8) + ord($data[$pos]); + $this->frame_len_read_pos++; + if($this->frame_len_read_pos == 4){ + $this->status = "message"; + } + break; + case "message": + if($this->frame_pos < $this->frame_len) { + $this->frame_data = $this->frame_data.$data[$pos]; + } + $this->frame_pos++; + if($this->frame_pos == $this->frame_len){ + $this->status = "init"; + $this->frame_pos = 0; + $this->received_data->append($this->_deserializeResponse($this->frame_data)); + $this->has_new_received_data = true; + $this->frame_data = ""; + } + break; + case "invalid": + break; + default: + break; + } + $pos++; + } + return strlen($data); // returning non-positive number aborts further transfer + }); + + curl_multi_add_handle($this->multi_handler, $this->curl_handler); + curl_multi_setopt($this->multi_handler, CURLMOPT_PIPELINING, 1); + curl_multi_setopt($this->multi_handler, CURLMOPT_MAXCONNECTS, 1); + + curl_multi_add_handle($this->multi_handler, $this->curl_handler); + $active = null; + while($this->call_status != "send_request_done"){ + curl_multi_exec($this->multi_handler, $active); + } + } + + /** + * @return mixed An iterator of response values + */ + public function responses() + { + $active = null; + while (!$this->channel->is_ch_finished($this->curl_handler) || $this->received_data->valid()) { + // echo $active." "; + if($this->received_data->valid()){ + yield $this->received_data->current(); + $this->received_data->next(); + } + if(!$this->channel->is_ch_finished($this->curl_handler)) { + do { + curl_multi_exec($this->multi_handler, $active); + curl_multi_select($this->multi_handler); + $info = curl_multi_info_read($this->multi_handler); + if (false !== $info) { + $done_ch = $info['handle']; + $this->channel->unregister_ch($done_ch); + } + if ($this->has_new_received_data) { + $this->receive_new_data = false; + break; + } + } while ($active > 0 && !$this->has_new_received_data); + } + } + + curl_multi_remove_handle($this->multi_handler, $this->curl_handler); + curl_close($this->curl_handler); + } + + /** + * TODO: support grpc APIs: + * public function getStatus() {} + * public function getMetadata() {} + */ + +} diff --git a/src/Grpc/src/UnaryCall.php b/src/Grpc/src/UnaryCall.php new file mode 100644 index 0000000000..9f5b122df9 --- /dev/null +++ b/src/Grpc/src/UnaryCall.php @@ -0,0 +1,154 @@ + a number (optional) + */ + + public function start($data, array $metadata = [], array $options = []) + { + $this->request_data = $this->_encode($data); + curl_setopt($this->curl_handler, CURLOPT_READFUNCTION, function ($ch, $fh, $length = false) + { + $start_pos = $this->request_data_pos; + $this->request_data_pos += 1024; + if($this->request_data_pos > strlen($this->request_data)){ + // send last request part; + $this->call_status = "send_request_done"; + } + $request_len = min(strlen($this->request_data)-$start_pos, 1024); + return substr($this->request_data, $start_pos, $request_len); + }); + + curl_setopt($this->curl_handler, CURLOPT_WRITEFUNCTION, function(&$ch, $data) { + // echo "[UnaryCall.php] chunk received: ".$data."\n"; + $this->call_status = "receive_response_begin"; + $data_len = strlen($data); + $pos = 0; + while($pos < $data_len) { + switch ($this->status){ + case "init": + // TODO: add compression bit check + $this->status = "length"; + break; + case "length": + if($this->frame_len_read_pos == 4){ + $this->frame_len = 0; + $this->frame_len_read_pos = 0; + } + $this->frame_len = ($this->frame_len << 8) + ord($data[$pos]); + $this->frame_len_read_pos++; + if($this->frame_len_read_pos == 4){ + $this->status = "message"; + } + break; + case "message": + if($this->frame_pos < $this->frame_len) { + $this->frame_data = $this->frame_data.$data[$pos]; + } + $this->frame_pos++; + if($this->frame_pos == $this->frame_len){ + $this->status = "init"; + $this->frame_pos = 0; + $this->received_data = $this->_deserializeResponse($this->frame_data); + $this->has_new_received_data = true; + $this->frame_data = ""; + } + break; + case "invalid": + break; + default: + break; + } + $pos++; + } + return strlen($data); // returning non-positive number aborts further transfer + }); + + curl_multi_add_handle($this->multi_handler, $this->curl_handler); + curl_multi_setopt($this->multi_handler, CURLMOPT_PIPELINING, 1); + curl_multi_setopt($this->multi_handler, CURLMOPT_MAXCONNECTS, 1); + + curl_multi_add_handle($this->multi_handler, $this->curl_handler); + $active = null; + while($this->call_status != "send_request_done"){ + curl_multi_exec($this->multi_handler, $active); + } + } + + /** + * Wait for the server to respond with data and a status. + * + * @return array [response data, status] + */ + public function wait() + { + $active = null; + do { + if($this->has_new_received_data) { + // TODO: close until ($this->stream_end=true)$info = curl_multi_info_read($this->multi_handler); + curl_multi_remove_handle($this->multi_handler, $this->curl_handler); + curl_close($this->curl_handler); + // TODO: return value should be grpc-status from trailer. + return [$this->received_data, 200]; + } + curl_multi_exec($this->multi_handler, $active); + curl_multi_select($this->multi_handler); + $info = curl_multi_info_read($this->multi_handler); + if (false !== $info) { + $done_ch = $info['handle']; + $this->channel->unregister_ch($done_ch); + } + } while ($active > 0 || $this->has_new_received_data); + } + + /** + * TODO: support grpc APIs: + * public function getStatus() {} + * public function getMetadata() {} + */ +}