Skip to content

Examples

Jeff Lindsay edited this page Aug 19, 2021 · 32 revisions

Simple RPC

Here is an example using qtalk for traditional unary RPC calls to remote methods. Exporting functions is done with a reflection based helper fn.HandlerFrom, which works on structs with methods as well as individual functions.

// server.go
package main

import (
	"fmt"
	"log"
	"net"
	"strings"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/fn"
	"github.com/progrium/qtalk-go/rpc"
)

type service struct{}

func (svc *service) Upper(s string) string {
	return strings.ToUpper(s)
}

// methods can opt-in to receive the call as last argument.
// also, errors can be returned to be received as remote errors.
func (svc *service) Error(s string, c *rpc.Call) error {
	return fmt.Errorf("%s [%s]", s, c.Selector)
}

func main() {
	// create a tcp listener
	l, err := net.Listen("tcp", "localhost:9999")
	if err != nil {
		log.Fatal(err)
	}

	// setup a server using fn.HandlerFrom to
	// handle methods from the service type
	srv := &rpc.Server{
		Codec:   codec.JSONCodec{},
		Handler: fn.HandlerFrom(new(service)),
	}

	// serve until the listener closes
	srv.Serve(l)
}

On the client side, we start using the talk package which makes setting up a client easy. You have the option of building it up on your own, making a network connection, setting up a mux session, then making an RPC client, but this is much more convenient in the common case.

// client.go
package main

import (
	"context"
	"log"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/fn"
	"github.com/progrium/qtalk-go/talk"
)

func main() {
	ctx := context.Background()

	// use talk.Dial to get a client
	client, err := talk.Dial("tcp", "localhost:9999", codec.JSONCodec{})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// call Upper and print the string return value
	var ret string
	_, err = client.Call(ctx, "Upper", fn.Args{"hello world"}, &ret)
	if err != nil {
		log.Fatal(err)
	}
	log.Println(ret)

	// call Error and expect err to be the returned error
	_, err = client.Call(ctx, "Error", fn.Args{"user error"}, nil)
	log.Println(err)

}

Call also returns a Response value that we discard here, but when doing simple unary calls like this we usually only care for the return value and/or the error. The Response value is used for more advanced scenarios, like streaming.

Bidirectional Calling

Here is a simple client and server using WebSocket to let the server call a sum function on the client when it connects. We still call the dialing side client and the accepting side server, but their roles as caller and responder are reversed.

// client.go
package main

import (
	"log"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/fn"
	"github.com/progrium/qtalk-go/talk"
)

func sum(a, b int) int {
	return a + b
}

func main() {
	// establish connection to server over websocket using JSON
	client, err := talk.Dial("ws", "localhost:8080", codec.JSONCodec{})
	if err != nil {
		log.Fatal(err)
	}

	// create handler from sum function for selector "sum"
	client.Handle("sum", fn.HandlerFrom(sum))

	// normally run in a goroutine, respond will sit and
	// respond to incoming calls with registered handlers
	client.Respond()
}

This example shows the other side than usual making a call, but the point of bidirectional calling is that both sides can make and respond to calls at any time. The protocol is symmetrical.

// server.go
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/mux"
	"github.com/progrium/qtalk-go/talk"
)

func fatal(err error) {
	if err != nil {
		log.Fatal(err)
	}
}

func main() {
	// create a websocket mux listener
	l, err := mux.ListenWS("localhost:8080")
	fatal(err)

	// accept a mux session
	// and make a peer
	sess, err := l.Accept()
	fatal(err)
	server := talk.NewPeer(sess, codec.JSONCodec{})
	defer server.Close()

	// call sum
	var result int
	_, err = server.Call(context.Background(), "sum", []int{2, 3}, &result)
	fatal(err)

	fmt.Println("result:", result)
}

This not only gives us more flexibility in how the connection topology for callers and responders are set up, but it opens up the possibility for the general pattern of callbacks.

Selector Routing

We can have many different handlers responding to different selectors using a RespondMux. This has similar semantics to an http.ServeMux, but allows selectors to use dot notation or path form, and makes it easy to nest muxes since they are also Handlers. When fn.HandlerFrom is given a struct, it's actually building a RespondMux with handlers for each method.

// server.go
package main

import (
	"fmt"
	"log"
	"net"
	"path"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/fn"
	"github.com/progrium/qtalk-go/rpc"
)

type dog struct{}

func (d *dog) Bark() string {
	return "BARK!"
}

func (d *dog) Growl() string {
	return "Grrrr"
}

// when structs used with fn.HandlerFrom also implement
// the rpc.Handler interface, it will be used as a catch-all
// for selectors that don't match method names
func (d *dog) RespondRPC(r rpc.Responder, c *rpc.Call) {
	// selectors are normalized to a path form when passed
	// into the handler, ie "dog.Rollover" becomes "/dog/Rollover"
	r.Return(fmt.Sprintf("%s??", path.Base(c.Selector)))
}

func main() {
	// create a tcp listener
	l, err := net.Listen("tcp", "localhost:9999")
	if err != nil {
		log.Fatal(err)
	}

	// create a respond mux to handle different selector patterns
	mux := rpc.NewRespondMux()
	mux.Handle("dog", fn.HandlerFrom(new(dog)))
	mux.Handle("dog.owner.Speak", rpc.HandlerFunc(func(r rpc.Responder, c *rpc.Call) {
		r.Return("I love my dog!")
	}))

	// create a sub-mux for foo that will match selectors starting with foo.bar.
	sub := rpc.NewRespondMux()
	sub.Handle("bar.", rpc.HandlerFunc(func(r rpc.Responder, c *rpc.Call) {
		r.Return(c.Selector)
	}))
	mux.Handle("foo", sub)

	// setup a server using the RespondMux
	srv := &rpc.Server{
		Codec:   codec.JSONCodec{},
		Handler: mux,
	}

	// serve until the listener closes
	srv.Serve(l)
}

This client simply iterates over a list of selectors to call and prints the selector with its returned reply.

// client.go
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/talk"
)

func main() {
	ctx := context.Background()

	// use talk.Dial to get a client
	client, err := talk.Dial("tcp", "localhost:9999", codec.JSONCodec{})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	selectors := []string{"dog.Bark", "dog/Growl", "dog.Dance", "dog.owner.Speak", "foo.bar.baz.qux"}

	// call with each selector and print the string return value
	for _, sel := range selectors {
		var ret string
		if _, err := client.Call(ctx, sel, nil, &ret); err != nil {
			log.Fatal(err)
		}
		fmt.Println(sel, "=>", ret)
	}

}

Output:

dog.Bark => BARK!
dog/Growl => Grrrr
dog.Dance => Dance??
dog.owner.Speak => I love my dog!
foo.bar.baz.qux => /foo/bar/baz/qux

This shows we can use dot or slash separators (or mixed!) as desired for our application, but handlers will always get a selector in rooted path form as seen in the last result.

Streaming Responses

It's increasingly common for RPC systems to stream multiple results. To do this, we forgo the fn.HandlerFrom convenience and implement an rpc.Handler, which gives much more control over how calls are handled. You might notice the similarity to net/http, including a rpc.HandlerFunc convenience for implementing a Handler with just a function.

// server.go
package main

import (
	"fmt"
	"log"
	"net"
	"time"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/rpc"
)

func main() {
	// create a tcp listener
	l, err := net.Listen("tcp", "localhost:9999")
	if err != nil {
		log.Fatal(err)
	}

	// setup a server with a single HandlerFunc handler
	srv := &rpc.Server{
		Codec: codec.JSONCodec{},
		Handler: rpc.HandlerFunc(func(r rpc.Responder, c *rpc.Call) {
			// receive the counter start value argument
			var counter int
			if err := c.Receive(&counter); err != nil {
				r.Return(fmt.Errorf("unable to receive argument: %s", err))
				return
			}
			// respond with a continue
			if _, err := r.Continue(nil); err != nil {
				log.Println(err)
				return
			}
			// now increment and send the counter
			// in a loop with 1s interval
			for {
				counter += 1
				if err := r.Send(counter); err != nil {
					return
				}
				time.Sleep(1 * time.Second)
			}
		}),
	}

	// serve until the listener closes
	srv.Serve(l)
}

The rpc.Responder used in the handler provides two ways to respond. Other than a normal Return, we can also Continue, which lets us either Send/Receive more values or use the underlying channel for byte-level IO.

In our client we finally start using the Response value returned by call.

// client.go
package main

import (
	"context"
	"log"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/talk"
)

func main() {
	ctx := context.Background()

	// use talk.Dial to get a client
	client, err := talk.Dial("tcp", "localhost:9999", codec.JSONCodec{})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// make call with empty selector
	// passing argument value of 10
	resp, err := client.Call(ctx, "", 10, nil)
	if err != nil {
		log.Fatal(err)
	}
	// make sure we got a continued response
	if !resp.Continue {
		log.Fatal("expected continued response")
	}
	// receive and print a counter value
	// until EOF or any error
	for {
		var counter int
		if err := resp.Receive(&counter); err != nil {
			return
		}
		log.Println(counter)
	}
}

The selector string in Call is optional if there is only one handler on the responding side. We also have the option to not use the normal reply value in favor of working with the Response value directly. In this case, we check that it's a "continued" response (not immediately closed) and then Receive from it until an error or EOF. Send and Receive operations use the codec specified when setting up the client for marshaling values.

In all these examples, we're using a builtin JSON codec that wraps encoding/json, but codecs for other formats like msgpack, protobufs, or something custom can be used as well.

Bytestream Proxy

We can also ignore RPC semantics and marshaling codecs entirely and use calls to set up "virtual connections" or channels to do byte-level IO. This is similar to hijacking HTTP connections where you stop speaking HTTP and start using it like a raw TCP socket, though in this case we're only hijacking the multiplexed channel not the whole connection.

This example is more complicated but shows how you can create proxies and tunnels. Our server in this example will make a WebSocket connection to a specified endpoint for every incoming call, then join the calling channel with the WebSocket connection like a reverse proxy.

// server.go
package main

import (
	"flag"
	"io"
	"log"
	"net"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/rpc"
	"golang.org/x/net/websocket"
)

func fatal(err error) {
	if err != nil {
		log.Fatal(err)
	}
}

func main() {
	flag.Parse()

	// create a tcp listener
	l, err := net.Listen("tcp", "localhost:9999")
	fatal(err)

	// setup a server with a HandlerFunc
	srv := &rpc.Server{
		Codec: codec.JSONCodec{},
		Handler: rpc.HandlerFunc(func(r rpc.Responder, c *rpc.Call) {
			// discard any arguments
			fatal(c.Receive(nil))

			// immediately continue,
			// getting the underlying channel
			ch, err := r.Continue(nil)
			if err != nil {
				log.Println(err)
				return
			}
			defer ch.Close()

			log.Printf("qtalk://localhost:9999 <=> %s", flag.Arg(0))

			// dial actual backend
			backend, err := websocket.Dial(flag.Arg(0), "", "http://localhost:9999")
			if err != nil {
				log.Println(err)
				return
			}
			defer backend.Close()

			// copy in both directions
			// creating a proxy until EOF
			go io.Copy(backend, ch)
			io.Copy(ch, backend)
		}),
	}

	// serve until the listener closes
	log.Println("backend listening on localhost:9999")
	fatal(srv.Serve(l))
}

The client sets up a connection to our server, then sets up a local WebSocket server frontend. For every request, it will call to our server and join the incoming WebSocket connection with the call channel, effectively connecting it with the connection our server makes to its configured endpoint.

package main

import (
	"context"
	"io"
	"log"
	"net/http"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/talk"
	"golang.org/x/net/websocket"
)

func fatal(err error) {
	if err != nil {
		log.Fatal(err)
	}
}

func main() {
	// use talk.Dial to get a client
	client, err := talk.Dial("tcp", "localhost:9999", codec.JSONCodec{})
	fatal(err)
	defer client.Close()

	log.Println("connected to backend at localhost:9999")

	// setup http+websocket server frontend
	srv := &http.Server{
		Addr: "localhost:8888",
		Handler: &websocket.Server{
			Handshake: nil,
			Handler: websocket.Handler(func(ws *websocket.Conn) {
				defer ws.Close()

				// for every websocket connection,
				// make a call with our client
				ctx := context.Background()
				resp, err := client.Call(ctx, "", nil, nil)
				fatal(err)

				// get the channel from response
				ch := resp.Channel
				defer ch.Close()

				log.Println("ws://localhost:8888 <=> qtalk://localhost:9999")

				// copy in both directions
				// creating a proxy until EOF
				go io.Copy(ch, ws)
				io.Copy(ws, ch)
			})},
	}

	// start the frontend server
	log.Println("frontend listening on localhost:8888")
	fatal(srv.ListenAndServe())
}

The end result is a WebSocket frontend client that tunnels the connection via qtalk to our server, which then opens a connection to a WebSocket backend joining it with the original WebSocket connection from the client. Although not used here, we could also include argument and return values before using the channel this way for extra metadata and signaling.

Another way to look at this is you can have RPC calls that come with an "attached" full-duplex pipe. Imagine an RPC call to provision a database that comes with an immediate tunneled client connection to it on success.

State Synchronization

This last example shows a simplified technique of state synchronization or replication. Our server has a list of usernames connected, which is our state. When a client connects, it calls Join to add its username to the list. This also registers the client to receive a callback passing the list of usernames whenever it changes. The client can then call Leave, or if it disconnects abruptly it will be unregistered with the next update.

// server.go
package main

import (
	"context"
	"log"
	"net"
	"sync"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/fn"
	"github.com/progrium/qtalk-go/rpc"
)

// State contains a map of usernames to callers,
// which are used as a callback client to that user
type State struct {
	users sync.Map
}

// Users gets a list of usernames from the keys of the sync.Map
func (s *State) Users() (users []string) {
	s.users.Range(func(k, v interface{}) bool {
		users = append(users, k.(string))
		return true
	})
	return
}

// Join adds a username and caller using the injected rpc.Call
// value, then broadcasts the change
func (s *State) Join(username string, c *rpc.Call) {
	s.users.Store(username, c.Caller)
	s.broadcast()
}

// Leave removes the user from the sync.Map and broadcasts
func (s *State) Leave(username string) {
	s.users.Delete(username)
	s.broadcast()
}

// broadcast uses the rpc.Caller values to perform a callback
// with the "state" selector, passing the current list of
// usernames. any callers that return an error are added to
// gone and then removed with Leave
func (s *State) broadcast() {
	users := s.Users()
	var gone []string
	s.users.Range(func(k, v interface{}) bool {
		_, err := v.(rpc.Caller).Call(context.Background(), "state", users, nil)
		if err != nil {
			log.Println(k.(string), err)
			gone = append(gone, k.(string))
		}
	})
	for _, u := range gone {
		s.Leave(u)
	}
}

func main() {
	// create a tcp listener
	l, err := net.Listen("tcp", "localhost:9999")
	if err != nil {
		log.Fatal(err)
	}

	// setup a server using fn.HandlerFrom to
	// handle methods from the state value
	srv := &rpc.Server{
		Codec:   codec.JSONCodec{},
		Handler: fn.HandlerFrom(new(State)),
	}

	// serve until the listener closes
	srv.Serve(l)
}

The Call pointer that handlers get contains a reference to a Caller, which is a client that can be used to make calls back to the caller, allowing callbacks.

Our client is straightforward. After setting up a connection and a handler to receive and display an updated username listing, we call Join with a username, wait for SIGINT, and call Leave before exiting.

// client.go
package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/progrium/qtalk-go/codec"
	"github.com/progrium/qtalk-go/fn"
	"github.com/progrium/qtalk-go/rpc"
	"github.com/progrium/qtalk-go/talk"
)

func fatal(err error) {
	if err != nil {
		log.Fatal(err)
	}
}

func main() {
	flag.Parse()

	// establish connection to server
	client, err := talk.Dial("tcp", "localhost:9999", codec.JSONCodec{})
	fatal(err)

	// state callback handler that redraws the user list
	client.Handle("state", rpc.HandlerFunc(func(r rpc.Responder, c *rpc.Call) {
		var users interface{}
		if err := c.Receive(&users); err != nil {
			log.Println(err)
			return
		}
		// the nonsense are terminal escape codes
		// to return to the last line and clear it
		fmt.Println("\u001B[1A\u001B[K", users)
	}))

	// respond to incoming calls
	go client.Respond()

	// call Join passing a username from arguments
	_, err = client.Call(context.Background(), "Join", fn.Args{flag.Arg(0)}, nil)
	fatal(err)

	// wait until we get SIGINT
	ch := make(chan os.Signal)
	signal.Notify(ch, os.Interrupt)
	<-ch

	// call Leave before finishing
	_, err = client.Call(context.Background(), "Leave", fn.Args{flag.Arg(0)}, nil)
	fatal(err)
}

A variation of this state replication pattern is using a streaming handler to return the current state and then stream state updates. Another approach to using callbacks would be where the caller specifies a callback selector, potentially to a temporary callback handler. See the fn package for helpers to create fn.Ptr values from functions that can be embedded into arguments and added to your RespondMux.

Clone this wiki locally