Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
MakMukhi committed Mar 9, 2017
1 parent 4eaacfe commit d9b58b5
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 25 deletions.
9 changes: 9 additions & 0 deletions keepalive/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,12 @@ type ClientParameters struct {
// If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool
}

// TODO(mmukhi) : documentation
type ServerParameters struct {
MaxConnectionIdle time.Duration
MaxConnectionAge time.Duration
MaxConnectionAgeGrace time.Duration
Time time.Duration
Timeout time.Duration
}
18 changes: 14 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
Expand Down Expand Up @@ -117,13 +118,21 @@ type options struct {
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
}

var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit

// A ServerOption sets options.
type ServerOption func(*options)

// TODO(mmukhi) : Documentation
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
return func(o *options) {
o.keepaliveParams = kp
}
}

// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
Expand Down Expand Up @@ -465,10 +474,11 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
// transport.NewServerTransport).
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams,
AuthInfo: authInfo,
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
MaxStreams: s.opts.maxConcurrentStreams,
AuthInfo: authInfo,
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
keepaliveParams: s.opts.keepaliveParams,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
Expand Down
17 changes: 11 additions & 6 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ const (
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
infinity = time.Duration(math.MaxInt64)
defaultKeepaliveTime = infinity
defaultKeepaliveTimeout = time.Duration(20 * time.Second)
defaultMaxStreamsClient = 100
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
infinity = time.Duration(math.MaxInt64)
defaultClientKeepaliveTime = infinity
defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
defaultMaxStreamsClient = 100
defaultMaxConnectionIdle = infinity
defaultMaxConnectionAge = infinity
defaultMaxConnectionAgeGrace = infinity
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
)

// The following defines various control items which could flow through
Expand Down
4 changes: 2 additions & 2 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
kp := opts.KeepaliveParams
// Validate keepalive parameters.
if kp.Time == 0 {
kp.Time = defaultKeepaliveTime
kp.Time = defaultClientKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultKeepaliveTimeout
kp.Timeout = defaultClientKeepaliveTimeout
}
var buf bytes.Buffer
t := &http2Client{
Expand Down
61 changes: 61 additions & 0 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ import (
"net"
"strconv"
"sync"
"time"

"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
Expand Down Expand Up @@ -90,11 +92,15 @@ type http2Server struct {

stats stats.Handler

// TODO(mmukhi): Documentation
kp keepalive.ServerParameters

mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
idle time.Time
}

// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
Expand Down Expand Up @@ -128,6 +134,22 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
kp := config.keepaliveParams
if kp.MaxConnectionIdle == 0 {
kp.MaxConnectionIdle = defaultMaxConnectionIdle
}
if kp.MaxConnectionAge == 0 {
kp.MaxConnectionAge = defaultMaxConnectionAge
}
if kp.MaxConnectionAgeGrace == 0 {
kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}
if kp.Time == 0 {
kp.Time = defaultServerKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultServerKeepaliveTimeout
}
var buf bytes.Buffer
t := &http2Server{
ctx: context.Background(),
Expand All @@ -149,6 +171,8 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
stats: config.StatsHandler,
kp: kp,
idle: time.Now(),
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
Expand Down Expand Up @@ -248,6 +272,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.maxStreamID = s.id
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
t.activeStreams[s.id] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
t.mu.Unlock()
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
Expand Down Expand Up @@ -735,6 +762,37 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
}
}

// TODO(mmukhi): Documentation
func (t *http2Server) keepalive() {
//p := &ping{data: [8]byte{}}
maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
maxAge := time.NewTimer(t.kp.MaxConnectionAge)
keepalive := time.NewTimer(t.kp.Time)
t.mu.Lock()
idle := t.idle
t.mu.Unlock()
for {
select {
case <-maxIdle.C:
if idle == t.idle {
// send go away
continue
}
if idle.IsZero() {
maxIdle.Reset(t.kp.MaxConnectionIdle)
continue
}
maxIdle.Reset(t.kp.MaxConnectionIdle - time.Since(idle))
case <-maxAge.C:
case <-keepalive.C:
case <-t.shutdownChan:
// TODO(mmukhi): clean-up
return
}
}

}

// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Server) controller() {
Expand Down Expand Up @@ -816,6 +874,9 @@ func (t *http2Server) Close() (err error) {
func (t *http2Server) closeStream(s *Stream) {
t.mu.Lock()
delete(t.activeStreams, s.id)
if len(t.activeStreams) == 0 {
t.idle = time.Now()
}
if t.state == draining && len(t.activeStreams) == 0 {
defer t.Close()
}
Expand Down
9 changes: 5 additions & 4 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,11 @@ const (

// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
MaxStreams uint32
AuthInfo credentials.AuthInfo
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
MaxStreams uint32
AuthInfo credentials.AuthInfo
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
keepaliveParams keepalive.ServerParameters
}

// NewServerTransport creates a ServerTransport with conn or non-nil error
Expand Down
27 changes: 18 additions & 9 deletions transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (h *testStreamHandler) handleStreamInvalidHeaderField(t *testing.T, s *Stre
}

// start starts server. Other goroutines should block on s.readyChan for further operations.
func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hType) {
var err error
if port == 0 {
s.lis, err = net.Listen("tcp", "localhost:0")
Expand All @@ -180,10 +180,7 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
if err != nil {
return
}
config := &ServerConfig{
MaxStreams: maxStreams,
}
transport, err := NewServerTransport("http2", conn, config)
transport, err := NewServerTransport("http2", conn, serverConfig)
if err != nil {
return
}
Expand Down Expand Up @@ -252,12 +249,12 @@ func (s *server) stop() {
}

func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{})
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
}

func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) {
func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, ClientTransport) {
server := &server{startedErr: make(chan error, 1)}
go server.start(t, port, maxStreams, ht)
go server.start(t, port, serverConfig, ht)
server.wait(t, 2*time.Second)
addr := "localhost:" + server.port
var (
Expand Down Expand Up @@ -301,6 +298,18 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
return tr
}

// TestMaxConnectionIdle tests that a server will send GoAway to a idle client.
// An idle client is one who doesn't make any RPC calls for a duration of
// MaxConnectionIdle time.
func TestMaxConnectionIdle(t *testing.T) {
serverConfig := &ServerConfig{
keepaliveParams: keepalive.ServerParams{
MaxConnectionIdle: 2 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{})
}

func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Expand Down Expand Up @@ -377,7 +386,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
}

func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
s, tr := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.
Expand Down

0 comments on commit d9b58b5

Please sign in to comment.