diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go index 3796b44c90..db4a73b8f7 100644 --- a/internal/jsonrpc2/jsonrpc2.go +++ b/internal/jsonrpc2/jsonrpc2.go @@ -11,10 +11,14 @@ import ( "context" "encoding/json" "fmt" - "runtime/trace" "sync" "sync/atomic" "time" + + "golang.org/x/tools/internal/lsp/telemetry" + "golang.org/x/tools/internal/lsp/telemetry/stats" + "golang.org/x/tools/internal/lsp/telemetry/tag" + "golang.org/x/tools/internal/lsp/telemetry/trace" ) // Conn is a JSON RPC 2 client server connection. @@ -35,9 +39,10 @@ type Conn struct { } type queueEntry struct { - ctx context.Context - c *Conn - r *Request + ctx context.Context + c *Conn + r *Request + size int64 } // Handler is an option you can pass to NewConn to handle incoming requests. @@ -56,6 +61,64 @@ type Handler func(context.Context, *Conn, *Request) // instead. type Canceler func(context.Context, *Conn, *Request) +type rpcStats struct { + server bool + method string + ctx context.Context + span trace.Span + start time.Time + received int64 + sent int64 +} + +type statsKeyType string + +const rpcStatsKey = statsKeyType("rpcStatsKey") + +func start(ctx context.Context, server bool, method string, id *ID) (context.Context, *rpcStats) { + s := &rpcStats{ + server: server, + method: method, + ctx: ctx, + start: time.Now(), + } + s.ctx = context.WithValue(s.ctx, rpcStatsKey, s) + tags := make([]tag.Mutator, 0, 4) + tags = append(tags, tag.Upsert(telemetry.KeyMethod, method)) + mode := telemetry.Outbound + spanKind := trace.SpanKindClient + if server { + spanKind = trace.SpanKindServer + mode = telemetry.Inbound + } + tags = append(tags, tag.Upsert(telemetry.KeyRPCDirection, mode)) + if id != nil { + tags = append(tags, tag.Upsert(telemetry.KeyRPCID, id.String())) + } + s.ctx, s.span = trace.StartSpan(ctx, method, trace.WithSpanKind(spanKind)) + s.ctx, _ = tag.New(s.ctx, tags...) + stats.Record(s.ctx, telemetry.Started.M(1)) + return s.ctx, s +} + +func (s *rpcStats) end(ctx context.Context, err *error) { + if err != nil && *err != nil { + ctx, _ = tag.New(ctx, tag.Upsert(telemetry.KeyStatus, "ERROR")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(telemetry.KeyStatus, "OK")) + } + elapsedTime := time.Since(s.start) + latencyMillis := float64(elapsedTime) / float64(time.Millisecond) + + stats.Record(ctx, + telemetry.ReceivedBytes.M(s.received), + telemetry.SentBytes.M(s.sent), + telemetry.Latency.M(latencyMillis), + ) + + s.span.End() +} + // NewErrorf builds a Error struct for the suppied message and code. // If args is not empty, message and args will be passed to Sprintf. func NewErrorf(code int64, format string, args ...interface{}) *Error { @@ -103,9 +166,10 @@ func (c *Conn) Cancel(id ID) { // Notify is called to send a notification request over the connection. // It will return as soon as the notification has been sent, as no response is // possible. -func (c *Conn) Notify(ctx context.Context, method string, params interface{}) error { - ctx, task := trace.NewTask(ctx, "jsonrpc2.Notify "+method) - defer task.End() +func (c *Conn) Notify(ctx context.Context, method string, params interface{}) (err error) { + ctx, rpcStats := start(ctx, false, method, nil) + defer rpcStats.end(ctx, &err) + jsonParams, err := marshalToRaw(params) if err != nil { return fmt.Errorf("marshalling notify parameters: %v", err) @@ -119,22 +183,23 @@ func (c *Conn) Notify(ctx context.Context, method string, params interface{}) er return fmt.Errorf("marshalling notify request: %v", err) } c.Logger(Send, nil, -1, request.Method, request.Params, nil) - return c.stream.Write(ctx, data) + n, err := c.stream.Write(ctx, data) + rpcStats.sent += n + return err } // Call sends a request over the connection and then waits for a response. // If the response is not an error, it will be decoded into result. // result must be of a type you an pass to json.Unmarshal. -func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) error { - ctx, task := trace.NewTask(ctx, "jsonrpc2.Call "+method) - defer task.End() +func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) (err error) { + // generate a new request identifier + id := ID{Number: atomic.AddInt64(&c.seq, 1)} + ctx, rpcStats := start(ctx, false, method, &id) + defer rpcStats.end(ctx, &err) jsonParams, err := marshalToRaw(params) if err != nil { return fmt.Errorf("marshalling call parameters: %v", err) } - // generate a new request identifier - id := ID{Number: atomic.AddInt64(&c.seq, 1)} - trace.Logf(ctx, "jsonrpc2", "request id %v", id) request := &Request{ ID: &id, Method: method, @@ -160,7 +225,9 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface // now we are ready to send before := time.Now() c.Logger(Send, request.ID, -1, request.Method, request.Params, nil) - if err := c.stream.Write(ctx, data); err != nil { + n, err := c.stream.Write(ctx, data) + rpcStats.sent += n + if err != nil { // sending failed, we will never get a response, so don't leave it pending return err } @@ -192,8 +259,9 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface // You must call this exactly once for any given request. // If err is set then result will be ignored. func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err error) error { - ctx, task := trace.NewTask(ctx, "jsonrpc2.Reply "+req.Method) - defer task.End() + ctx, st := trace.StartSpan(ctx, req.Method+":reply", trace.WithSpanKind(trace.SpanKindClient)) + defer st.End() + if req.IsNotify() { return fmt.Errorf("reply not invoked with a valid call") } @@ -228,7 +296,17 @@ func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err return err } c.Logger(Send, response.ID, elapsed, req.Method, response.Result, response.Error) - if err = c.stream.Write(ctx, data); err != nil { + n, err := c.stream.Write(ctx, data) + + v := ctx.Value(rpcStatsKey) + if v != nil { + s := v.(*rpcStats) + s.sent += n + } else { + //panic("no stats available in reply") + } + + if err != nil { // TODO(iancottrell): if a stream write fails, we really need to shut down // the whole stream return err @@ -253,8 +331,8 @@ type combined struct { Error *Error `json:"error,omitempty"` } -func (c *Conn) deliver(ctx context.Context, q chan queueEntry, request *Request) bool { - e := queueEntry{ctx: ctx, c: c, r: request} +func (c *Conn) deliver(ctx context.Context, q chan queueEntry, request *Request, size int64) bool { + e := queueEntry{ctx: ctx, c: c, r: request, size: size} if !c.RejectIfOverloaded { q <- e return true @@ -281,17 +359,15 @@ func (c *Conn) Run(ctx context.Context) error { if e.ctx.Err() != nil { continue } - ctx, task := trace.NewTask(e.ctx, "jsonrpc2.Handle "+e.r.Method) - if !e.r.IsNotify() { - trace.Logf(ctx, "jsonrpc2", "request id %v", e.r.ID) - } + ctx, rpcStats := start(ctx, true, e.r.Method, e.r.ID) + rpcStats.received += e.size c.Handler(ctx, e.c, e.r) - task.End() + rpcStats.end(ctx, nil) } }() for { // get the data for a message - data, err := c.stream.Read(ctx) + data, n, err := c.stream.Read(ctx) if err != nil { // the stream failed, we cannot continue return err @@ -316,7 +392,7 @@ func (c *Conn) Run(ctx context.Context) error { if request.IsNotify() { c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil) // we have a Notify, add to the processor queue - c.deliver(ctx, q, request) + c.deliver(ctx, q, request, n) //TODO: log when we drop a message? } else { // we have a Call, add to the processor queue @@ -329,7 +405,7 @@ func (c *Conn) Run(ctx context.Context) error { } c.handlingMu.Unlock() c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil) - if !c.deliver(reqCtx, q, request) { + if !c.deliver(reqCtx, q, request, n) { // queue is full, reject the message by directly replying c.Reply(ctx, request, nil, NewErrorf(CodeServerOverloaded, "no room in queue")) } diff --git a/internal/jsonrpc2/stream.go b/internal/jsonrpc2/stream.go index fe28c55a9a..f850c27c79 100644 --- a/internal/jsonrpc2/stream.go +++ b/internal/jsonrpc2/stream.go @@ -22,10 +22,10 @@ import ( type Stream interface { // Read gets the next message from the stream. // It is never called concurrently. - Read(context.Context) ([]byte, error) + Read(context.Context) ([]byte, int64, error) // Write sends a message to the stream. // It must be safe for concurrent use. - Write(context.Context, []byte) error + Write(context.Context, []byte) (int64, error) } // NewStream returns a Stream built on top of an io.Reader and io.Writer @@ -44,29 +44,29 @@ type plainStream struct { out io.Writer } -func (s *plainStream) Read(ctx context.Context) ([]byte, error) { +func (s *plainStream) Read(ctx context.Context) ([]byte, int64, error) { select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, 0, ctx.Err() default: } var raw json.RawMessage if err := s.in.Decode(&raw); err != nil { - return nil, err + return nil, 0, err } - return raw, nil + return raw, int64(len(raw)), nil } -func (s *plainStream) Write(ctx context.Context, data []byte) error { +func (s *plainStream) Write(ctx context.Context, data []byte) (int64, error) { select { case <-ctx.Done(): - return ctx.Err() + return 0, ctx.Err() default: } s.outMu.Lock() - _, err := s.out.Write(data) + n, err := s.out.Write(data) s.outMu.Unlock() - return err + return int64(n), err } // NewHeaderStream returns a Stream built on top of an io.Reader and io.Writer @@ -85,18 +85,19 @@ type headerStream struct { out io.Writer } -func (s *headerStream) Read(ctx context.Context) ([]byte, error) { +func (s *headerStream) Read(ctx context.Context) ([]byte, int64, error) { select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, 0, ctx.Err() default: } - var length int64 + var total, length int64 // read the header, stop on the first empty line for { line, err := s.in.ReadString('\n') + total += int64(len(line)) if err != nil { - return nil, fmt.Errorf("failed reading header line %q", err) + return nil, total, fmt.Errorf("failed reading header line %q", err) } line = strings.TrimSpace(line) // check we have a header line @@ -105,42 +106,45 @@ func (s *headerStream) Read(ctx context.Context) ([]byte, error) { } colon := strings.IndexRune(line, ':') if colon < 0 { - return nil, fmt.Errorf("invalid header line %q", line) + return nil, total, fmt.Errorf("invalid header line %q", line) } name, value := line[:colon], strings.TrimSpace(line[colon+1:]) switch name { case "Content-Length": if length, err = strconv.ParseInt(value, 10, 32); err != nil { - return nil, fmt.Errorf("failed parsing Content-Length: %v", value) + return nil, total, fmt.Errorf("failed parsing Content-Length: %v", value) } if length <= 0 { - return nil, fmt.Errorf("invalid Content-Length: %v", length) + return nil, total, fmt.Errorf("invalid Content-Length: %v", length) } default: // ignoring unknown headers } } if length == 0 { - return nil, fmt.Errorf("missing Content-Length header") + return nil, total, fmt.Errorf("missing Content-Length header") } data := make([]byte, length) if _, err := io.ReadFull(s.in, data); err != nil { - return nil, err + return nil, total, err } - return data, nil + total += length + return data, total, nil } -func (s *headerStream) Write(ctx context.Context, data []byte) error { +func (s *headerStream) Write(ctx context.Context, data []byte) (int64, error) { select { case <-ctx.Done(): - return ctx.Err() + return 0, ctx.Err() default: } s.outMu.Lock() - _, err := fmt.Fprintf(s.out, "Content-Length: %v\r\n\r\n", len(data)) + defer s.outMu.Unlock() + n, err := fmt.Fprintf(s.out, "Content-Length: %v\r\n\r\n", len(data)) + total := int64(n) if err == nil { - _, err = s.out.Write(data) + n, err = s.out.Write(data) + total += int64(n) } - s.outMu.Unlock() - return err + return total, err }