Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service feature #96

Open
wants to merge 17 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package turnpike

import (
"encoding/json"
"fmt"
"reflect"
"strings"
"time"
"unicode"
"unicode/utf8"
)

var (
Expand Down Expand Up @@ -36,9 +41,16 @@ type Client struct {
listeners map[ID]chan Message
events map[ID]*eventDesc
procedures map[ID]*procedureDesc
serviceMap map[string]*service
requestCount uint
}

type service struct {
name string
procedures map[string]reflect.Method
topics map[string]reflect.Method
}

type procedureDesc struct {
name string
handler MethodHandler
Expand Down Expand Up @@ -510,3 +522,237 @@ func (c *Client) Call(procedure string, args []interface{}, kwargs map[string]in
return result, nil
}
}

// RegisterService registers in the dealer the set of methods of the
// receiver value that satisfy the following conditions:
// - exported method of exported type
// - at least one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods.
// The client accesses each method using a string of the form "type.method",
// where type is the receiver's concrete type.
func (c *Client) RegisterService(rcvr interface{}) error {
return c.registerService(rcvr, "", false)
}

// RegisterServiceName is like RegisterService but uses the provided name for the type
// instead of the receiver's concrete type.
func (c *Client) RegisterServiceName(name string, rcvr interface{}) error {
return c.registerService(rcvr, name, true)
}

var typeOfError = reflect.TypeOf((*error)(nil)).Elem()

// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}

func (c *Client) registerService(rcvr interface{}, name string, useName bool) error {
if c.serviceMap == nil {
c.serviceMap = make(map[string]*service)
}
typ := reflect.TypeOf(rcvr)
val := reflect.ValueOf(rcvr)
sname := reflect.Indirect(val).Type().Name()
if name != "" {
sname = name
}
if !isExported(sname) && !useName {
return fmt.Errorf("type %s is not exported", sname)
}
if _, present := c.serviceMap[sname]; present {
return fmt.Errorf("service %s is already defined", sname)
}

procedures := make(map[string]reflect.Method)
topics := make(map[string]reflect.Method)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
}

// Method needs at least one out to be a procedure
numOut := mtype.NumOut()
if numOut == 0 {
// A method with zero outs must start with On to be a event listener
if strings.HasPrefix(mname, "On") {
topics[mname] = method
}
continue
}

// Method last out must be error
if errType := mtype.Out(numOut - 1); errType != typeOfError {
continue
}

procedures[mname] = method
}

// Register methods as procedures with Dealer
for mname, value := range procedures {
procedure := value
namespace := sname + "." + mname
f := func(args []interface{}, kwargs map[string]interface{}, details map[string]interface{}) (callResult *CallResult) {
methodArgs := procedure.Type.NumIn() - 1
if methodArgs != len(args) {
err := fmt.Errorf("method %s has %d inputs, was called with %d arguments as %s procedure ", procedure.Name, methodArgs, len(args), namespace)
return &CallResult{
Args: []interface{}{err.Error()},
Err: ErrInvalidArgument,
}
}
values := make([]reflect.Value, len(args))
var err error
for i, arg := range args {
in := procedure.Type.In(i + 1)
values[i], err = decodeArgument(in, arg)
if err != nil {
return &CallResult{
Args: []interface{}{err.Error()},
Err: ErrInvalidArgument,
}
}
}
values = append([]reflect.Value{val}, values...)
function := procedure.Func
returnValues := function.Call(values)

result := make([]interface{}, len(returnValues))
for i := range returnValues {
result[i] = returnValues[i].Interface()
}
return &CallResult{
Args: result,
}
}
if err := c.Register(namespace, f, map[string]interface{}{}); err != nil {
return err
}
}

// Subscribe methods to topics with the broker
for mname, value := range topics {
topic := value
namespace := sname + "." + mname
f := func(args []interface{}, kwargs map[string]interface{}) {
methodArgs := topic.Type.NumIn() - 1
if methodArgs != len(args) {
log.Printf("event %s has %d inputs, was published with %d arguments", namespace, methodArgs, len(args))
return
}
values := make([]reflect.Value, len(args))
var err error
for i, arg := range args {
in := topic.Type.In(i + 1)
values[i], err = decodeArgument(in, arg)
if err != nil {
log.Println(err)
return
}
}
values = append([]reflect.Value{val}, values...)
topic.Func.Call(values)
}
if err := c.Subscribe(namespace, f); err != nil {
return err
}
}
c.serviceMap[sname] = &service{
procedures: procedures,
topics: topics,
name: sname,
}
return nil
}

func decodeArgument(target reflect.Type, arg interface{}) (reflect.Value, error) {
if arg == nil {
return reflect.Zero(target), nil
}
val := reflect.New(target)
b, err := json.Marshal(arg)
if err != nil {
return val, err
}
err = json.Unmarshal(b, val.Interface())
v := reflect.Indirect(val)
return v, err
}

// UnregisterService unsubscribes the service event listeners and unregisters the service procedures.
func (c *Client) UnregisterService(name string) error {
service, present := c.serviceMap[name]
if !present {
return fmt.Errorf("service %s is not defined", name)
}
errs := newErrCol()
for procedureName := range service.procedures {
err := c.Unregister(service.name + "." + procedureName)
errs.IfErrAppend(err)
}
for topicName := range service.topics {
err := c.Unsubscribe(service.name + "." + topicName)
errs.IfErrAppend(err)
}
c.serviceMap[name] = nil
if len(errs) == 0 {
return nil
}
return errs
}

// CallService invokes the service method 'namespace' with arguments 'args'.
// The return values are saved in 'replies'.
func (c *Client) CallService(namespace string, args []interface{}, replies ...interface{}) error {
for i := 0; i < len(replies); i++ {
if replies[i] == nil {
continue
}
kind := reflect.ValueOf(replies[i]).Kind()
if kind != reflect.Ptr {
return fmt.Errorf("reply[%d] type %s is not a pointer", i, kind)
}
}

res, err := c.Call(namespace, args, nil)
if err != nil {
return err
}
returnValues := res.Arguments
if len(returnValues)-1 < len(replies) {
return fmt.Errorf("expected %d return values, got %d", len(replies), len(returnValues)-1)
}
if len(returnValues) == 0 {
return fmt.Errorf("expected at least one return value of type string, nil or error")
}

for i := 0; i < len(replies); i++ {
if replies[i] == nil {
continue
}
vReplies := reflect.ValueOf(replies[i])
tReplies := vReplies.Type()
val, err := decodeArgument(tReplies, returnValues[i])
if err != nil {
return err
}
vReplies.Elem().Set(reflect.Indirect(val))
}
switch e := returnValues[len(returnValues)-1].(type) {
case string:
return fmt.Errorf("%s", e)
case nil:
return nil
case error:
return e
default:
return fmt.Errorf("expected the last return value to be of type string, nil or error got %T", e)
}
}
Loading