diff --git a/playground/socket/socket.go b/playground/socket/socket.go index 87abf56371..cec842e54b 100644 --- a/playground/socket/socket.go +++ b/playground/socket/socket.go @@ -255,25 +255,28 @@ func limiter(in <-chan *Message, p killer) <-chan *Message { // buffer returns a channel that wraps the given channel. It receives messages // from the given channel and sends them to the returned channel. // Message bodies are gathered over the period msgDelay and coalesced into a -// single Message before they are passed on. -// When the given channel is closed, buffer flushes the remaining buffered -// messages and closes the returned channel. +// single Message before they are passed on. Messages of the same kind are +// coalesced; when a message of a different kind is received, any buffered +// messages are flushed. When the given channel is closed, buffer flushes the +// remaining buffered messages and closes the returned channel. func buffer(in <-chan *Message) <-chan *Message { out := make(chan *Message) go func() { defer close(out) - buf := make(map[string][]byte) // [kind]buffer - flush := func() { - for kind, b := range buf { - if len(b) == 0 { - continue + var ( + t = time.NewTimer(msgDelay) + tc <-chan time.Time + buf []byte + kind string + flush = func() { + if len(buf) == 0 { + return } - out <- &Message{Kind: kind, Body: safeString(b)} - buf[kind] = b[:0] // recycle buffer + out <- &Message{Kind: kind, Body: safeString(buf)} + buf = buf[:0] // recycle buffer + kind = "" } - } - t := time.NewTimer(msgDelay) - var tc <-chan time.Time + ) for { select { case m, ok := <-in: @@ -286,11 +289,15 @@ func buffer(in <-chan *Message) <-chan *Message { out <- m return } - buf[m.Kind] = append(buf[m.Kind], m.Body...) - if tc == nil { - tc = t.C - t.Reset(msgDelay) + if kind != m.Kind { + flush() + kind = m.Kind + if tc == nil { + tc = t.C + t.Reset(msgDelay) + } } + buf = append(buf, m.Body...) case <-tc: flush() tc = nil diff --git a/playground/socket/socket_test.go b/playground/socket/socket_test.go new file mode 100644 index 0000000000..5dd2815ee4 --- /dev/null +++ b/playground/socket/socket_test.go @@ -0,0 +1,73 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package socket + +import ( + "testing" + "time" +) + +func TestBuffer(t *testing.T) { + ch := make(chan *Message) + go func() { + ch <- &Message{Kind: "err", Body: "a"} + ch <- &Message{Kind: "err", Body: "b"} + ch <- &Message{Kind: "out", Body: "1"} + ch <- &Message{Kind: "out", Body: "2"} + time.Sleep(msgDelay * 2) + ch <- &Message{Kind: "out", Body: "3"} + ch <- &Message{Kind: "out", Body: "4"} + close(ch) + }() + + var ms []*Message + for m := range buffer(ch) { + ms = append(ms, m) + } + if len(ms) != 3 { + t.Fatalf("got %v messages, want 2", len(ms)) + } + if g, w := ms[0].Body, "ab"; g != w { + t.Errorf("message 0 body = %q, want %q", g, w) + } + if g, w := ms[1].Body, "12"; g != w { + t.Errorf("message 1 body = %q, want %q", g, w) + } + if g, w := ms[2].Body, "34"; g != w { + t.Errorf("message 2 body = %q, want %q", g, w) + } +} + +type killRecorder chan struct{} + +func (k killRecorder) Kill() { close(k) } + +func TestLimiter(t *testing.T) { + ch := make(chan *Message) + go func() { + var m Message + for i := 0; i < msgLimit+10; i++ { + ch <- &m + } + ch <- &Message{Kind: "end"} + }() + + kr := make(killRecorder) + n := 0 + for m := range limiter(ch, kr) { + n++ + if n > msgLimit && m.Kind != "end" { + t.Errorf("received non-end message after limit") + } + } + if n != msgLimit+1 { + t.Errorf("received %v messages, want %v", n, msgLimit+1) + } + select { + case <-kr: + case <-time.After(100 * time.Millisecond): + t.Errorf("process wasn't killed after reaching limit") + } +}