internal/jsonrpc2: add telemetry to the rpc system

This uses the opencensus compatability later to track all the json rpc calls in
and out.

Change-Id: Ib719879a8d6855b6e6479a4f1b01fe823b548110
Reviewed-on: https://go-review.googlesource.com/c/tools/+/183248
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Rebecca Stambler <rstambler@golang.org>
This commit is contained in:
Ian Cottrell 2019-06-21 16:26:30 -04:00
parent 47ea21585c
commit a6ef77d3cb
2 changed files with 134 additions and 54 deletions

View File

@ -11,10 +11,14 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"runtime/trace"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"golang.org/x/tools/internal/lsp/telemetry"
"golang.org/x/tools/internal/lsp/telemetry/stats"
"golang.org/x/tools/internal/lsp/telemetry/tag"
"golang.org/x/tools/internal/lsp/telemetry/trace"
) )
// Conn is a JSON RPC 2 client server connection. // Conn is a JSON RPC 2 client server connection.
@ -35,9 +39,10 @@ type Conn struct {
} }
type queueEntry struct { type queueEntry struct {
ctx context.Context ctx context.Context
c *Conn c *Conn
r *Request r *Request
size int64
} }
// Handler is an option you can pass to NewConn to handle incoming requests. // Handler is an option you can pass to NewConn to handle incoming requests.
@ -56,6 +61,64 @@ type Handler func(context.Context, *Conn, *Request)
// instead. // instead.
type Canceler func(context.Context, *Conn, *Request) type Canceler func(context.Context, *Conn, *Request)
type rpcStats struct {
server bool
method string
ctx context.Context
span trace.Span
start time.Time
received int64
sent int64
}
type statsKeyType string
const rpcStatsKey = statsKeyType("rpcStatsKey")
func start(ctx context.Context, server bool, method string, id *ID) (context.Context, *rpcStats) {
s := &rpcStats{
server: server,
method: method,
ctx: ctx,
start: time.Now(),
}
s.ctx = context.WithValue(s.ctx, rpcStatsKey, s)
tags := make([]tag.Mutator, 0, 4)
tags = append(tags, tag.Upsert(telemetry.KeyMethod, method))
mode := telemetry.Outbound
spanKind := trace.SpanKindClient
if server {
spanKind = trace.SpanKindServer
mode = telemetry.Inbound
}
tags = append(tags, tag.Upsert(telemetry.KeyRPCDirection, mode))
if id != nil {
tags = append(tags, tag.Upsert(telemetry.KeyRPCID, id.String()))
}
s.ctx, s.span = trace.StartSpan(ctx, method, trace.WithSpanKind(spanKind))
s.ctx, _ = tag.New(s.ctx, tags...)
stats.Record(s.ctx, telemetry.Started.M(1))
return s.ctx, s
}
func (s *rpcStats) end(ctx context.Context, err *error) {
if err != nil && *err != nil {
ctx, _ = tag.New(ctx, tag.Upsert(telemetry.KeyStatus, "ERROR"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(telemetry.KeyStatus, "OK"))
}
elapsedTime := time.Since(s.start)
latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
stats.Record(ctx,
telemetry.ReceivedBytes.M(s.received),
telemetry.SentBytes.M(s.sent),
telemetry.Latency.M(latencyMillis),
)
s.span.End()
}
// NewErrorf builds a Error struct for the suppied message and code. // NewErrorf builds a Error struct for the suppied message and code.
// If args is not empty, message and args will be passed to Sprintf. // If args is not empty, message and args will be passed to Sprintf.
func NewErrorf(code int64, format string, args ...interface{}) *Error { func NewErrorf(code int64, format string, args ...interface{}) *Error {
@ -103,9 +166,10 @@ func (c *Conn) Cancel(id ID) {
// Notify is called to send a notification request over the connection. // Notify is called to send a notification request over the connection.
// It will return as soon as the notification has been sent, as no response is // It will return as soon as the notification has been sent, as no response is
// possible. // possible.
func (c *Conn) Notify(ctx context.Context, method string, params interface{}) error { func (c *Conn) Notify(ctx context.Context, method string, params interface{}) (err error) {
ctx, task := trace.NewTask(ctx, "jsonrpc2.Notify "+method) ctx, rpcStats := start(ctx, false, method, nil)
defer task.End() defer rpcStats.end(ctx, &err)
jsonParams, err := marshalToRaw(params) jsonParams, err := marshalToRaw(params)
if err != nil { if err != nil {
return fmt.Errorf("marshalling notify parameters: %v", err) return fmt.Errorf("marshalling notify parameters: %v", err)
@ -119,22 +183,23 @@ func (c *Conn) Notify(ctx context.Context, method string, params interface{}) er
return fmt.Errorf("marshalling notify request: %v", err) return fmt.Errorf("marshalling notify request: %v", err)
} }
c.Logger(Send, nil, -1, request.Method, request.Params, nil) c.Logger(Send, nil, -1, request.Method, request.Params, nil)
return c.stream.Write(ctx, data) n, err := c.stream.Write(ctx, data)
rpcStats.sent += n
return err
} }
// Call sends a request over the connection and then waits for a response. // Call sends a request over the connection and then waits for a response.
// If the response is not an error, it will be decoded into result. // If the response is not an error, it will be decoded into result.
// result must be of a type you an pass to json.Unmarshal. // result must be of a type you an pass to json.Unmarshal.
func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) error { func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) (err error) {
ctx, task := trace.NewTask(ctx, "jsonrpc2.Call "+method) // generate a new request identifier
defer task.End() id := ID{Number: atomic.AddInt64(&c.seq, 1)}
ctx, rpcStats := start(ctx, false, method, &id)
defer rpcStats.end(ctx, &err)
jsonParams, err := marshalToRaw(params) jsonParams, err := marshalToRaw(params)
if err != nil { if err != nil {
return fmt.Errorf("marshalling call parameters: %v", err) return fmt.Errorf("marshalling call parameters: %v", err)
} }
// generate a new request identifier
id := ID{Number: atomic.AddInt64(&c.seq, 1)}
trace.Logf(ctx, "jsonrpc2", "request id %v", id)
request := &Request{ request := &Request{
ID: &id, ID: &id,
Method: method, Method: method,
@ -160,7 +225,9 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
// now we are ready to send // now we are ready to send
before := time.Now() before := time.Now()
c.Logger(Send, request.ID, -1, request.Method, request.Params, nil) c.Logger(Send, request.ID, -1, request.Method, request.Params, nil)
if err := c.stream.Write(ctx, data); err != nil { n, err := c.stream.Write(ctx, data)
rpcStats.sent += n
if err != nil {
// sending failed, we will never get a response, so don't leave it pending // sending failed, we will never get a response, so don't leave it pending
return err return err
} }
@ -192,8 +259,9 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
// You must call this exactly once for any given request. // You must call this exactly once for any given request.
// If err is set then result will be ignored. // If err is set then result will be ignored.
func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err error) error { func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err error) error {
ctx, task := trace.NewTask(ctx, "jsonrpc2.Reply "+req.Method) ctx, st := trace.StartSpan(ctx, req.Method+":reply", trace.WithSpanKind(trace.SpanKindClient))
defer task.End() defer st.End()
if req.IsNotify() { if req.IsNotify() {
return fmt.Errorf("reply not invoked with a valid call") return fmt.Errorf("reply not invoked with a valid call")
} }
@ -228,7 +296,17 @@ func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err
return err return err
} }
c.Logger(Send, response.ID, elapsed, req.Method, response.Result, response.Error) c.Logger(Send, response.ID, elapsed, req.Method, response.Result, response.Error)
if err = c.stream.Write(ctx, data); err != nil { n, err := c.stream.Write(ctx, data)
v := ctx.Value(rpcStatsKey)
if v != nil {
s := v.(*rpcStats)
s.sent += n
} else {
//panic("no stats available in reply")
}
if err != nil {
// TODO(iancottrell): if a stream write fails, we really need to shut down // TODO(iancottrell): if a stream write fails, we really need to shut down
// the whole stream // the whole stream
return err return err
@ -253,8 +331,8 @@ type combined struct {
Error *Error `json:"error,omitempty"` Error *Error `json:"error,omitempty"`
} }
func (c *Conn) deliver(ctx context.Context, q chan queueEntry, request *Request) bool { func (c *Conn) deliver(ctx context.Context, q chan queueEntry, request *Request, size int64) bool {
e := queueEntry{ctx: ctx, c: c, r: request} e := queueEntry{ctx: ctx, c: c, r: request, size: size}
if !c.RejectIfOverloaded { if !c.RejectIfOverloaded {
q <- e q <- e
return true return true
@ -281,17 +359,15 @@ func (c *Conn) Run(ctx context.Context) error {
if e.ctx.Err() != nil { if e.ctx.Err() != nil {
continue continue
} }
ctx, task := trace.NewTask(e.ctx, "jsonrpc2.Handle "+e.r.Method) ctx, rpcStats := start(ctx, true, e.r.Method, e.r.ID)
if !e.r.IsNotify() { rpcStats.received += e.size
trace.Logf(ctx, "jsonrpc2", "request id %v", e.r.ID)
}
c.Handler(ctx, e.c, e.r) c.Handler(ctx, e.c, e.r)
task.End() rpcStats.end(ctx, nil)
} }
}() }()
for { for {
// get the data for a message // get the data for a message
data, err := c.stream.Read(ctx) data, n, err := c.stream.Read(ctx)
if err != nil { if err != nil {
// the stream failed, we cannot continue // the stream failed, we cannot continue
return err return err
@ -316,7 +392,7 @@ func (c *Conn) Run(ctx context.Context) error {
if request.IsNotify() { if request.IsNotify() {
c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil) c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil)
// we have a Notify, add to the processor queue // we have a Notify, add to the processor queue
c.deliver(ctx, q, request) c.deliver(ctx, q, request, n)
//TODO: log when we drop a message? //TODO: log when we drop a message?
} else { } else {
// we have a Call, add to the processor queue // we have a Call, add to the processor queue
@ -329,7 +405,7 @@ func (c *Conn) Run(ctx context.Context) error {
} }
c.handlingMu.Unlock() c.handlingMu.Unlock()
c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil) c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil)
if !c.deliver(reqCtx, q, request) { if !c.deliver(reqCtx, q, request, n) {
// queue is full, reject the message by directly replying // queue is full, reject the message by directly replying
c.Reply(ctx, request, nil, NewErrorf(CodeServerOverloaded, "no room in queue")) c.Reply(ctx, request, nil, NewErrorf(CodeServerOverloaded, "no room in queue"))
} }

View File

@ -22,10 +22,10 @@ import (
type Stream interface { type Stream interface {
// Read gets the next message from the stream. // Read gets the next message from the stream.
// It is never called concurrently. // It is never called concurrently.
Read(context.Context) ([]byte, error) Read(context.Context) ([]byte, int64, error)
// Write sends a message to the stream. // Write sends a message to the stream.
// It must be safe for concurrent use. // It must be safe for concurrent use.
Write(context.Context, []byte) error Write(context.Context, []byte) (int64, error)
} }
// NewStream returns a Stream built on top of an io.Reader and io.Writer // NewStream returns a Stream built on top of an io.Reader and io.Writer
@ -44,29 +44,29 @@ type plainStream struct {
out io.Writer out io.Writer
} }
func (s *plainStream) Read(ctx context.Context) ([]byte, error) { func (s *plainStream) Read(ctx context.Context) ([]byte, int64, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, 0, ctx.Err()
default: default:
} }
var raw json.RawMessage var raw json.RawMessage
if err := s.in.Decode(&raw); err != nil { if err := s.in.Decode(&raw); err != nil {
return nil, err return nil, 0, err
} }
return raw, nil return raw, int64(len(raw)), nil
} }
func (s *plainStream) Write(ctx context.Context, data []byte) error { func (s *plainStream) Write(ctx context.Context, data []byte) (int64, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return 0, ctx.Err()
default: default:
} }
s.outMu.Lock() s.outMu.Lock()
_, err := s.out.Write(data) n, err := s.out.Write(data)
s.outMu.Unlock() s.outMu.Unlock()
return err return int64(n), err
} }
// NewHeaderStream returns a Stream built on top of an io.Reader and io.Writer // NewHeaderStream returns a Stream built on top of an io.Reader and io.Writer
@ -85,18 +85,19 @@ type headerStream struct {
out io.Writer out io.Writer
} }
func (s *headerStream) Read(ctx context.Context) ([]byte, error) { func (s *headerStream) Read(ctx context.Context) ([]byte, int64, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, 0, ctx.Err()
default: default:
} }
var length int64 var total, length int64
// read the header, stop on the first empty line // read the header, stop on the first empty line
for { for {
line, err := s.in.ReadString('\n') line, err := s.in.ReadString('\n')
total += int64(len(line))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed reading header line %q", err) return nil, total, fmt.Errorf("failed reading header line %q", err)
} }
line = strings.TrimSpace(line) line = strings.TrimSpace(line)
// check we have a header line // check we have a header line
@ -105,42 +106,45 @@ func (s *headerStream) Read(ctx context.Context) ([]byte, error) {
} }
colon := strings.IndexRune(line, ':') colon := strings.IndexRune(line, ':')
if colon < 0 { if colon < 0 {
return nil, fmt.Errorf("invalid header line %q", line) return nil, total, fmt.Errorf("invalid header line %q", line)
} }
name, value := line[:colon], strings.TrimSpace(line[colon+1:]) name, value := line[:colon], strings.TrimSpace(line[colon+1:])
switch name { switch name {
case "Content-Length": case "Content-Length":
if length, err = strconv.ParseInt(value, 10, 32); err != nil { if length, err = strconv.ParseInt(value, 10, 32); err != nil {
return nil, fmt.Errorf("failed parsing Content-Length: %v", value) return nil, total, fmt.Errorf("failed parsing Content-Length: %v", value)
} }
if length <= 0 { if length <= 0 {
return nil, fmt.Errorf("invalid Content-Length: %v", length) return nil, total, fmt.Errorf("invalid Content-Length: %v", length)
} }
default: default:
// ignoring unknown headers // ignoring unknown headers
} }
} }
if length == 0 { if length == 0 {
return nil, fmt.Errorf("missing Content-Length header") return nil, total, fmt.Errorf("missing Content-Length header")
} }
data := make([]byte, length) data := make([]byte, length)
if _, err := io.ReadFull(s.in, data); err != nil { if _, err := io.ReadFull(s.in, data); err != nil {
return nil, err return nil, total, err
} }
return data, nil total += length
return data, total, nil
} }
func (s *headerStream) Write(ctx context.Context, data []byte) error { func (s *headerStream) Write(ctx context.Context, data []byte) (int64, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return 0, ctx.Err()
default: default:
} }
s.outMu.Lock() s.outMu.Lock()
_, err := fmt.Fprintf(s.out, "Content-Length: %v\r\n\r\n", len(data)) defer s.outMu.Unlock()
n, err := fmt.Fprintf(s.out, "Content-Length: %v\r\n\r\n", len(data))
total := int64(n)
if err == nil { if err == nil {
_, err = s.out.Write(data) n, err = s.out.Write(data)
total += int64(n)
} }
s.outMu.Unlock() return total, err
return err
} }