-
-
Notifications
You must be signed in to change notification settings - Fork 4
Examples
Here is an example using qtalk for traditional unary RPC calls to remote methods. Exporting
functions is done with a reflection based helper fn.HandlerFrom, which works
on structs with methods as well as individual functions.
// server.go
package main
import (
"fmt"
"log"
"net"
"strings"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/fn"
"github.com/progrium/qtalk-go/rpc"
)
type service struct{}
func (svc *service) Upper(s string) string {
return strings.ToUpper(s)
}
// methods can opt-in to receive the call as last argument.
// also, errors can be returned to be received as remote errors.
func (svc *service) Error(s string, c *rpc.Call) error {
return fmt.Errorf("%s [%s]", s, c.Selector)
}
func main() {
// create a tcp listener
l, err := net.Listen("tcp", "localhost:9999")
if err != nil {
log.Fatal(err)
}
// setup a server using fn.HandlerFrom to
// handle methods from the service type
srv := &rpc.Server{
Codec: codec.JSONCodec{},
Handler: fn.HandlerFrom(new(service)),
}
// serve until the listener closes
srv.Serve(l)
}On the client side, we start using the talk package which makes setting up a client
easy. You have the option of building it up on your own, making a network connection,
setting up a mux session, then making an RPC client, but this is much more convenient
in the common case.
// client.go
package main
import (
"context"
"log"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/fn"
"github.com/progrium/qtalk-go/talk"
)
func main() {
ctx := context.Background()
// use talk.Dial to get a client
client, err := talk.Dial("tcp", "localhost:9999", codec.JSONCodec{})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// call Upper and print the string return value
var ret string
_, err = client.Call(ctx, "Upper", fn.Args{"hello world"}, &ret)
if err != nil {
log.Fatal(err)
}
log.Println(ret)
// call Error and expect err to be the returned error
_, err = client.Call(ctx, "Error", fn.Args{"user error"}, nil)
log.Println(err)
}Call also returns a Response value that we discard here, but when doing simple unary calls like this we usually only care for the return value and/or the error. The Response value is used for more advanced scenarios, like streaming.
Here is a simple client and server using WebSocket to let the server call a sum function on the client when it connects. We still call the dialing side client and the accepting side server, but their roles as caller and responder are reversed.
// client.go
package main
import (
"log"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/fn"
"github.com/progrium/qtalk-go/talk"
)
func sum(a, b int) int {
return a + b
}
func main() {
// establish connection to server over websocket using JSON
client, err := talk.Dial("ws", "localhost:8080", codec.JSONCodec{})
if err != nil {
log.Fatal(err)
}
// create handler from sum function for selector "sum"
client.Handle("sum", fn.HandlerFrom(sum))
// normally run in a goroutine, respond will sit and
// respond to incoming calls with registered handlers
client.Respond()
}This example shows the other side than usual making a call, but the point of bidirectional calling is that both sides can make and respond to calls at any time. The protocol is symmetrical.
// server.go
package main
import (
"context"
"fmt"
"log"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/mux"
"github.com/progrium/qtalk-go/talk"
)
func fatal(err error) {
if err != nil {
log.Fatal(err)
}
}
func main() {
// create a websocket mux listener
l, err := mux.ListenWS("localhost:8080")
fatal(err)
// accept a mux session
// and make a peer
sess, err := l.Accept()
fatal(err)
server := talk.NewPeer(sess, codec.JSONCodec{})
defer server.Close()
// call sum
var result int
_, err = server.Call(context.Background(), "sum", []int{2, 3}, &result)
fatal(err)
fmt.Println("result:", result)
}This not only gives us more flexibility in how the connection topology for callers and responders are set up, but it opens up the possibility for the general pattern of callbacks.
We can have many different handlers responding to different selectors using a RespondMux. This has similar
semantics to an http.ServeMux, but allows selectors to use dot notation or path form, and makes
it easy to nest muxes since they are also Handlers. When fn.HandlerFrom is given a struct, it's
actually building a RespondMux with handlers for each method.
// server.go
package main
import (
"fmt"
"log"
"net"
"path"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/fn"
"github.com/progrium/qtalk-go/rpc"
)
type dog struct{}
func (d *dog) Bark() string {
return "BARK!"
}
func (d *dog) Growl() string {
return "Grrrr"
}
// when structs used with fn.HandlerFrom also implement
// the rpc.Handler interface, it will be used as a catch-all
// for selectors that don't match method names
func (d *dog) RespondRPC(r rpc.Responder, c *rpc.Call) {
// selectors are normalized to a path form when passed
// into the handler, ie "dog.Rollover" becomes "/dog/Rollover"
r.Return(fmt.Sprintf("%s??", path.Base(c.Selector)))
}
func main() {
// create a tcp listener
l, err := net.Listen("tcp", "localhost:9999")
if err != nil {
log.Fatal(err)
}
// create a respond mux to handle different selector patterns
mux := rpc.NewRespondMux()
mux.Handle("dog", fn.HandlerFrom(new(dog)))
mux.Handle("dog.owner.Speak", rpc.HandlerFunc(func(r rpc.Responder, c *rpc.Call) {
r.Return("I love my dog!")
}))
// create a sub-mux for foo that will match selectors starting with foo.bar.
sub := rpc.NewRespondMux()
sub.Handle("bar.", rpc.HandlerFunc(func(r rpc.Responder, c *rpc.Call) {
r.Return(c.Selector)
}))
mux.Handle("foo", sub)
// setup a server using the RespondMux
srv := &rpc.Server{
Codec: codec.JSONCodec{},
Handler: mux,
}
// serve until the listener closes
srv.Serve(l)
}This client simply iterates over a list of selectors to call and prints the selector with its returned reply.
// client.go
package main
import (
"context"
"fmt"
"log"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/talk"
)
func main() {
ctx := context.Background()
// use talk.Dial to get a client
client, err := talk.Dial("tcp", "localhost:9999", codec.JSONCodec{})
if err != nil {
log.Fatal(err)
}
defer client.Close()
selectors := []string{"dog.Bark", "dog/Growl", "dog.Dance", "dog.owner.Speak", "foo.bar.baz.qux"}
// call with each selector and print the string return value
for _, sel := range selectors {
var ret string
if _, err := client.Call(ctx, sel, nil, &ret); err != nil {
log.Fatal(err)
}
fmt.Println(sel, "=>", ret)
}
}Output:
dog.Bark => BARK!
dog/Growl => Grrrr
dog.Dance => Dance??
dog.owner.Speak => I love my dog!
foo.bar.baz.qux => /foo/bar/baz/qux
This shows we can use dot or slash separators (or mixed!) as desired for our application, but handlers will always get a selector in rooted path form as seen in the last result.
It's increasingly common for RPC systems to stream multiple results. To do this,
we forgo the fn.HandlerFrom convenience and implement an rpc.Handler, which
gives much more control over how calls are handled. You might notice the similarity
to net/http, including a rpc.HandlerFunc convenience for implementing a Handler
with just a function.
// server.go
package main
import (
"fmt"
"log"
"net"
"time"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/rpc"
)
func main() {
// create a tcp listener
l, err := net.Listen("tcp", "localhost:9999")
if err != nil {
log.Fatal(err)
}
// setup a server with a single HandlerFunc handler
srv := &rpc.Server{
Codec: codec.JSONCodec{},
Handler: rpc.HandlerFunc(func(r rpc.Responder, c *rpc.Call) {
// receive the counter start value argument
var counter int
if err := c.Receive(&counter); err != nil {
r.Return(fmt.Errorf("unable to receive argument: %s", err))
return
}
// respond with a continue
if _, err := r.Continue(nil); err != nil {
log.Println(err)
return
}
// now increment and send the counter
// in a loop with 1s interval
for {
counter += 1
if err := r.Send(counter); err != nil {
return
}
time.Sleep(1 * time.Second)
}
}),
}
// serve until the listener closes
srv.Serve(l)
}The rpc.Responder used in the handler provides two ways to respond. Other than
a normal Return, we can also Continue, which lets us either Send/Receive more values or use
the underlying channel for byte-level IO.
In our client we finally start using the Response value returned by call.
// client.go
package main
import (
"context"
"log"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/talk"
)
func main() {
ctx := context.Background()
// use talk.Dial to get a client
client, err := talk.Dial("tcp", "localhost:9999", codec.JSONCodec{})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// make call with empty selector
// passing argument value of 10
resp, err := client.Call(ctx, "", 10, nil)
if err != nil {
log.Fatal(err)
}
// make sure we got a continued response
if !resp.Continue {
log.Fatal("expected continued response")
}
// receive and print a counter value
// until EOF or any error
for {
var counter int
if err := resp.Receive(&counter); err != nil {
return
}
log.Println(counter)
}
}The selector string in Call is optional if there is only one handler on the responding side. We also have the option to not use the normal reply value in favor of working with the Response value directly. In this case, we check that it's a "continued" response (not immediately closed) and then Receive from it until an error or EOF. Send and Receive operations use the codec specified when setting up the client for marshaling values.
In all these examples, we're using a builtin JSON codec that wraps encoding/json, but codecs
for other formats like msgpack, protobufs, or something custom can be used as well.
We can also ignore RPC semantics and marshaling codecs entirely and use calls to set up "virtual connections" or channels to do byte-level IO. This is similar to hijacking HTTP connections where you stop speaking HTTP and start using it like a raw TCP socket, though in this case we're only hijacking the multiplexed channel not the whole connection.
This example is more complicated but shows how you can create proxies and tunnels. Our server in this example will make a WebSocket connection to a specified endpoint for every incoming call, then join the calling channel with the WebSocket connection like a reverse proxy.
// server.go
package main
import (
"flag"
"io"
"log"
"net"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/rpc"
"golang.org/x/net/websocket"
)
func fatal(err error) {
if err != nil {
log.Fatal(err)
}
}
func main() {
flag.Parse()
// create a tcp listener
l, err := net.Listen("tcp", "localhost:9999")
fatal(err)
// setup a server with a HandlerFunc
srv := &rpc.Server{
Codec: codec.JSONCodec{},
Handler: rpc.HandlerFunc(func(r rpc.Responder, c *rpc.Call) {
// discard any arguments
fatal(c.Receive(nil))
// immediately continue,
// getting the underlying channel
ch, err := r.Continue(nil)
if err != nil {
log.Println(err)
return
}
defer ch.Close()
log.Printf("qtalk://localhost:9999 <=> %s", flag.Arg(0))
// dial actual backend
backend, err := websocket.Dial(flag.Arg(0), "", "http://localhost:9999")
if err != nil {
log.Println(err)
return
}
defer backend.Close()
// copy in both directions
// creating a proxy until EOF
go io.Copy(backend, ch)
io.Copy(ch, backend)
}),
}
// serve until the listener closes
log.Println("backend listening on localhost:9999")
fatal(srv.Serve(l))
}The client sets up a connection to our server, then sets up a local WebSocket server frontend. For every request, it will call to our server and join the incoming WebSocket connection with the call channel, effectively connecting it with the connection our server makes to its configured endpoint.
package main
import (
"context"
"io"
"log"
"net/http"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/talk"
"golang.org/x/net/websocket"
)
func fatal(err error) {
if err != nil {
log.Fatal(err)
}
}
func main() {
// use talk.Dial to get a client
client, err := talk.Dial("tcp", "localhost:9999", codec.JSONCodec{})
fatal(err)
defer client.Close()
log.Println("connected to backend at localhost:9999")
// setup http+websocket server frontend
srv := &http.Server{
Addr: "localhost:8888",
Handler: &websocket.Server{
Handshake: nil,
Handler: websocket.Handler(func(ws *websocket.Conn) {
defer ws.Close()
// for every websocket connection,
// make a call with our client
ctx := context.Background()
resp, err := client.Call(ctx, "", nil, nil)
fatal(err)
// get the channel from response
ch := resp.Channel
defer ch.Close()
log.Println("ws://localhost:8888 <=> qtalk://localhost:9999")
// copy in both directions
// creating a proxy until EOF
go io.Copy(ch, ws)
io.Copy(ws, ch)
})},
}
// start the frontend server
log.Println("frontend listening on localhost:8888")
fatal(srv.ListenAndServe())
}The end result is a WebSocket frontend client that tunnels the connection via qtalk to our server, which then opens a connection to a WebSocket backend joining it with the original WebSocket connection from the client. Although not used here, we could also include argument and return values before using the channel this way for extra metadata and signaling.
Another way to look at this is you can have RPC calls that come with an "attached" full-duplex pipe. Imagine an RPC call to provision a database that comes with an immediate tunneled client connection to it on success.
This last example shows a simplified technique of state synchronization or replication. Our server has a list of usernames connected, which is our state. When a client connects, it calls Join to add its username to the list. This also registers the client to receive a callback passing the list of usernames whenever it changes. The client can then call Leave, or if it disconnects abruptly it will be unregistered with the next update.
// server.go
package main
import (
"context"
"log"
"net"
"sync"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/fn"
"github.com/progrium/qtalk-go/rpc"
)
// State contains a map of usernames to callers,
// which are used as a callback client to that user
type State struct {
users sync.Map
}
// Users gets a list of usernames from the keys of the sync.Map
func (s *State) Users() (users []string) {
s.users.Range(func(k, v interface{}) bool {
users = append(users, k.(string))
return true
})
return
}
// Join adds a username and caller using the injected rpc.Call
// value, then broadcasts the change
func (s *State) Join(username string, c *rpc.Call) {
s.users.Store(username, c.Caller)
s.broadcast()
}
// Leave removes the user from the sync.Map and broadcasts
func (s *State) Leave(username string) {
s.users.Delete(username)
s.broadcast()
}
// broadcast uses the rpc.Caller values to perform a callback
// with the "state" selector, passing the current list of
// usernames. any callers that return an error are added to
// gone and then removed with Leave
func (s *State) broadcast() {
users := s.Users()
var gone []string
s.users.Range(func(k, v interface{}) bool {
_, err := v.(rpc.Caller).Call(context.Background(), "state", users, nil)
if err != nil {
log.Println(k.(string), err)
gone = append(gone, k.(string))
}
})
for _, u := range gone {
s.Leave(u)
}
}
func main() {
// create a tcp listener
l, err := net.Listen("tcp", "localhost:9999")
if err != nil {
log.Fatal(err)
}
// setup a server using fn.HandlerFrom to
// handle methods from the state value
srv := &rpc.Server{
Codec: codec.JSONCodec{},
Handler: fn.HandlerFrom(new(State)),
}
// serve until the listener closes
srv.Serve(l)
}The Call pointer that handlers get contains a reference to a Caller, which is a client that can be used to make calls back to the caller, allowing callbacks.
Our client is straightforward. After setting up a connection and a handler to receive and display an updated username listing, we call Join with a username, wait for SIGINT, and call Leave before exiting.
// client.go
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"github.com/progrium/qtalk-go/codec"
"github.com/progrium/qtalk-go/fn"
"github.com/progrium/qtalk-go/rpc"
"github.com/progrium/qtalk-go/talk"
)
func fatal(err error) {
if err != nil {
log.Fatal(err)
}
}
func main() {
flag.Parse()
// establish connection to server
client, err := talk.Dial("tcp", "localhost:9999", codec.JSONCodec{})
fatal(err)
// state callback handler that redraws the user list
client.Handle("state", rpc.HandlerFunc(func(r rpc.Responder, c *rpc.Call) {
var users interface{}
if err := c.Receive(&users); err != nil {
log.Println(err)
return
}
// the nonsense are terminal escape codes
// to return to the last line and clear it
fmt.Println("\u001B[1A\u001B[K", users)
}))
// respond to incoming calls
go client.Respond()
// call Join passing a username from arguments
_, err = client.Call(context.Background(), "Join", fn.Args{flag.Arg(0)}, nil)
fatal(err)
// wait until we get SIGINT
ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt)
<-ch
// call Leave before finishing
_, err = client.Call(context.Background(), "Leave", fn.Args{flag.Arg(0)}, nil)
fatal(err)
}A variation of this state replication pattern is using a streaming handler to return the current state
and then stream state updates. Another approach to using callbacks would be where the caller specifies
a callback selector, potentially to a temporary callback handler. See the fn package for helpers to
create fn.Ptr values from functions that can be embedded into arguments and added to your RespondMux.