forked from gravitational/teleport
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathctx.go
351 lines (300 loc) · 10.3 KB
/
ctx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
/*
* Teleport
* Copyright (C) 2023 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package sshutils
import (
"context"
"io"
"net"
"sync"
"time"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"golang.org/x/crypto/ssh"
"golang.org/x/crypto/ssh/agent"
"github.com/gravitational/teleport/lib/teleagent"
"github.com/gravitational/teleport/lib/utils/uds"
)
// TCPIPForwardDialer represents a dialer used to handle TCPIP forward requests.
type TCPIPForwardDialer func(string) (net.Conn, error)
// TCPIPForwardProcess represents an instance of a port forwarding process.
type TCPIPForwardProcess struct {
// Conn is the socket used to request a dialer or listener in the process.
Conn *uds.Conn
// Done signals when the process completes.
Done <-chan struct{}
// Closer contains and extra io.Closer to run when the process as a whole
// is closed.
Closer io.Closer
}
// Close stops the process and frees up its related resources.
func (p *TCPIPForwardProcess) Close() error {
var errs []error
if p.Conn != nil {
errs = append(errs, p.Conn.Close())
}
if p.Closer != nil {
errs = append(errs, p.Closer.Close())
}
return trace.NewAggregate(errs...)
}
// ConnectionContext manages connection-level state.
type ConnectionContext struct {
// NetConn is the base connection object.
NetConn net.Conn
// ServerConn is authenticated ssh connection.
ServerConn *ssh.ServerConn
// mu protects the rest of the state
mu sync.RWMutex
// env holds environment variables which should be
// set for all channels.
env map[string]string
// forwardAgent indicates that agent forwarding has
// been requested for this connection.
forwardAgent bool
// sessions is the number of currently active session channels; only tracked
// when handling node-side connections for users with MaxSessions applied.
sessions int64
// tcpipForwardDialer is a lazily initialized dialer used to handle all tcpip
// forwarding requests.
tcpipForwardDialer TCPIPForwardDialer
// tcpipForwardProcess is a lazily initialized connection to the subprocess that
// handles remote port forwarding.
tcpipForwardProcess *TCPIPForwardProcess
// closers is a list of io.Closer that will be called when session closes
// this is handy as sometimes client closes session, in this case resources
// will be properly closed and deallocated, otherwise they could be kept hanging.
closers []io.Closer
// closed indicates that closers have been run.
closed bool
// cancel cancels the context.Context scope associated with this ConnectionContext.
cancel context.CancelFunc
// clientLastActive records the last time there was activity from the client.
clientLastActive time.Time
clock clockwork.Clock
}
type ConnectionContextOption func(c *ConnectionContext)
// SetConnectionContextClock sets the connection context's internal clock.
func SetConnectionContextClock(clock clockwork.Clock) ConnectionContextOption {
return func(c *ConnectionContext) {
c.clock = clock
}
}
// NewConnectionContext creates a new ConnectionContext and a child context.Context
// instance which will be canceled when the ConnectionContext is closed.
func NewConnectionContext(ctx context.Context, nconn net.Conn, sconn *ssh.ServerConn, opts ...ConnectionContextOption) (context.Context, *ConnectionContext) {
ctx, cancel := context.WithCancel(ctx)
ccx := &ConnectionContext{
NetConn: nconn,
ServerConn: sconn,
env: make(map[string]string),
cancel: cancel,
clock: clockwork.NewRealClock(),
}
for _, opt := range opts {
opt(ccx)
}
return ctx, ccx
}
// agentChannel implements the extended teleteleagent.Agent interface,
// allowing the underlying ssh.Channel to be closed when the agent
// is no longer needed.
type agentChannel struct {
agent.ExtendedAgent
ch ssh.Channel
}
// Close closes the agent channel.
func (a *agentChannel) Close() error {
// For graceful teardown, close the write part of the channel first. This
// will send "EOF" packet (type 96) to the other side which will drain and
// close the channel.
//
// The regular close after that will send "close" packet (type 97) which
// won't attempt to send us any more data since the channel is already
// closed.
//
// This mimics vanilla OpenSSH behavior. Without close_write first, the
// agent client may be getting warnings like the following in stdout:
//
// channel 1: chan_shutdown_read: shutdown() failed for fd 8 [i0 o1]: Not a socket
return trace.NewAggregate(
a.ch.CloseWrite(),
a.ch.Close())
}
// StartAgentChannel sets up a new agent forwarding channel against this connection. The channel
// is automatically closed when either ConnectionContext, or the supplied context.Context
// gets canceled.
func (c *ConnectionContext) StartAgentChannel() (teleagent.Agent, error) {
// refuse to start an agent if forwardAgent has not yet been set.
if !c.GetForwardAgent() {
return nil, trace.AccessDenied("agent forwarding has not been requested")
}
// open a agent channel to client
ch, reqC, err := c.ServerConn.OpenChannel(AuthAgentRequest, nil)
if err != nil {
return nil, trace.Wrap(err)
}
go ssh.DiscardRequests(reqC)
return &agentChannel{
ExtendedAgent: agent.NewClient(ch),
ch: ch,
}, nil
}
// VisitEnv grants visitor-style access to env variables.
func (c *ConnectionContext) VisitEnv(visit func(key, val string)) {
c.mu.Lock()
defer c.mu.Unlock()
for key, val := range c.env {
visit(key, val)
}
}
// SetEnv sets a environment variable within this context.
func (c *ConnectionContext) SetEnv(key, val string) {
c.mu.Lock()
defer c.mu.Unlock()
c.env[key] = val
}
// GetEnv returns a environment variable within this context.
func (c *ConnectionContext) GetEnv(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
val, ok := c.env[key]
return val, ok
}
// SetForwardAgent configures this context to support agent forwarding.
// Must not be set until agent forwarding is explicitly requested.
func (c *ConnectionContext) SetForwardAgent(forwardAgent bool) {
c.mu.Lock()
defer c.mu.Unlock()
c.forwardAgent = forwardAgent
}
// GetForwardAgent loads the forwardAgent flag with lock.
func (c *ConnectionContext) GetForwardAgent() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.forwardAgent
}
// TryIncrSessions tries to increment the active session count; if ok the
// returned decr function *must* be called when the associated session is closed.
func (c *ConnectionContext) IncrSessions(max int64) (decr func(), ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.sessions >= max {
return func() {}, false
}
c.sessions++
var decrOnce sync.Once
return func() {
decrOnce.Do(c.decrSessions)
}, true
}
func (c *ConnectionContext) decrSessions() {
c.mu.Lock()
defer c.mu.Unlock()
c.sessions--
if c.sessions < 0 {
panic("underflow")
}
}
// GetClientLastActive returns time when client was last active.
func (c *ConnectionContext) GetClientLastActive() time.Time {
c.mu.RLock()
defer c.mu.RUnlock()
return c.clientLastActive
}
// UpdateClientActivity sets last recorded client activity associated with this context.
func (c *ConnectionContext) UpdateClientActivity() {
c.mu.Lock()
defer c.mu.Unlock()
c.clientLastActive = c.clock.Now().UTC()
}
// TrySetDirectTCPIPForwardDialer attempts to registers a DirectTCPIPForwardDialer. If a different dialer was
// concurrently registered, ok is false and the previously registered dialer is returned.
func (c *ConnectionContext) TrySetDirectTCPIPForwardDialer(d TCPIPForwardDialer) (registered TCPIPForwardDialer, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.tcpipForwardDialer != nil {
return c.tcpipForwardDialer, false
}
c.tcpipForwardDialer = d
return c.tcpipForwardDialer, true
}
// GetDirectTCPIPForwardDialer gets the registered DirectTCPIPForwardDialer if one exists.
func (c *ConnectionContext) GetDirectTCPIPForwardDialer() (d TCPIPForwardDialer, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
return c.tcpipForwardDialer, c.tcpipForwardDialer != nil
}
// AddCloser adds any closer in ctx that will be called
// when the underlying connection is closed.
func (c *ConnectionContext) AddCloser(closer io.Closer) {
c.mu.Lock()
defer c.mu.Unlock()
// if context was already closed, run the closer immediately
// in the background.
if c.closed {
go closer.Close()
return
}
c.closers = append(c.closers, closer)
}
// TrySetTCPIPForwardProcess attempts to registers a TCPIPForwardProcess. If a
// different process was concurrently registered, ok is false and the previously
// registered process is returned.
func (c *ConnectionContext) TrySetTCPIPForwardProcess(proc *TCPIPForwardProcess) (*TCPIPForwardProcess, bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.tcpipForwardProcess != nil {
return c.tcpipForwardProcess, false
}
c.tcpipForwardProcess = proc
return proc, true
}
// GetTCPIPForwardProcess gets the registered TCPIPForwardProcess if one exists.
func (c *ConnectionContext) GetTCPIPForwardProcess() (*TCPIPForwardProcess, bool) {
c.mu.Lock()
defer c.mu.Unlock()
return c.tcpipForwardProcess, c.tcpipForwardProcess != nil
}
// takeClosers returns all resources that should be closed and sets the properties to null
// we do this to avoid calling Close() under lock to avoid potential deadlocks
func (c *ConnectionContext) takeClosers() []io.Closer {
// this is done to avoid any operation holding the lock for too long
c.mu.Lock()
defer c.mu.Unlock()
closers := c.closers
c.closers = nil
c.closed = true
return closers
}
// Close closes associated resources (e.g. agent channel).
func (c *ConnectionContext) Close() error {
var errs []error
c.cancel()
closers := c.takeClosers()
for _, cl := range closers {
if cl == nil {
continue
}
err := cl.Close()
if err == nil {
continue
}
errs = append(errs, err)
}
return trace.NewAggregate(errs...)
}