Skip to content

Commit

Permalink
prompting: add epoll package (#12963)
Browse files Browse the repository at this point in the history
* mailmap: map new commits from [email protected] to [email protected]

Signed-off-by: Oliver Calder <[email protected]>

* Initial commit

Signed-off-by: Zygmunt Krynicki <[email protected]>
Signed-off-by: Oliver Calder <[email protected]>

* Correct references to old name

Signed-off-by: Oliver Calder <[email protected]>

* osutil/epoll: fixed epoll test import

Signed-off-by: Oliver Calder <[email protected]>

* osutil/epoll: make Close() set epoll fd to -1

Signed-off-by: Oliver Calder <[email protected]>

* epoll: renamed `FromSys()` epoll event parameter name to `ev`

Signed-off-by: Oliver Calder <[email protected]>

* epoll: added unit tests, but `Register()` always fails, as does ioctl syscall to prepare fd to register

Signed-off-by: Oliver Calder <[email protected]>

* epoll: removed unused `modeSet` type from epoll_test.go

Signed-off-by: Oliver Calder <[email protected]>

* epoll: replaced `syscall` with vendored `x/sys/unix`

Signed-off-by: Oliver Calder <[email protected]>

* epoll: simplified `Readiness` type to match `EPOLL{IN,OUT}` flags

Signed-off-by: Oliver Calder <[email protected]>

* epoll: fix unit tests using socketpair

Signed-off-by: Oliver Calder <[email protected]>

* epoll: fixed typo in test

Signed-off-by: Oliver Calder <[email protected]>

* epoll: tie events list to instance with entry for each registered fd, and add unit tests

Signed-off-by: Oliver Calder <[email protected]>

* epoll: add `WaitTimeout(msec)` with configurable timeout

Signed-off-by: Oliver Calder <[email protected]>

* epoll: remove comment about using internal/poll instead of epoll

Signed-off-by: Oliver Calder <[email protected]>

* epoll: improve error message when epoll create syscall fails

Signed-off-by: Oliver Calder <[email protected]>

* epoll: use time.Duration instead of int in `WaitTimeout()`

Signed-off-by: Oliver Calder <[email protected]>

* epoll: make epoll package thread safe

Replace the instance-specific event buffer with a mutex-guarded
registered FD count, allowing an event buffer with a number of slots
equal to the current number of registered FDs to be allocated in
`Wait()`, and thus allowing events from all FDs to be handled by a
single `Wait()` call.

Previously, the instance-specific event buffer was shared by separate
calls to `Wait()`, with the address of the start of the buffer passed
into the EPOLL_WAIT syscall, so multiple concurrent calls to `Wait()`
could cause race conditions where the contents of the buffer was
overwritten by multiple threads at once. Now, since the event buffer is
allocated within the `Wait()` function, this race condition is avoided.

Whenever a new FD is registered, the registeredFdCount variable is
incremented, and whenever an FD is deregistered, that variable is
decremented, in both cases guarded by the mutex for the epoll instance.

So what happens if there are n FDs registered, a call to `Wait()`
begins, and then while waiting, another FD is registered with the epoll
instance? Could this result in a buffer with too few entries for the now
(n+1) registered FDs? No, because beyond race conditions where the
kernel receives two write syscalls simultaneously and these end up in
the same epoll response (extremely unlikely), the only way that multiple
events are returned by the same call to `Wait()` is if there were
previous write events which had been caught by epoll but hadn't yet been
handled with an EPOLL_WAIT syscall. In this case, `Wait()` would
immediately return all those previous events, before the new FD is able
to be registered and written to.

The way around this would be if `Wait()` began, allocated the buffer
with size n, and then the thread were interrupted, and the new FD
registered and written to before the EPOLL_WAIT syscall occurred. In
this case, if all the initial n FDs had pending activity, then the
`Wait()` call would return an event buffer of size n, presumably with
the events for the original n FDs, and a subsequent call to `Wait()`
would return the event on the newly-registered FD. No event is lost in
any case.

Signed-off-by: Oliver Calder <[email protected]>

* epoll: simplified duration in `Wait()`

Signed-off-by: Oliver Calder <[email protected]>

* epoll: `s/waitMillisecondsThenWriteToFile/waitMillisecondsThenWriteToFd`

Signed-off-by: Oliver Calder <[email protected]>

* epoll: simplified timeout duration definitions in tests

Signed-off-by: Oliver Calder <[email protected]>

* epoll: gracefully handle `EINT` errors from `EPOLL_WAIT` syscall

Signed-off-by: Oliver Calder <[email protected]>

* epoll: clarified handling of EINTR from EPOLL_WAIT syscall

Signed-off-by: Oliver Calder <[email protected]>

* epoll: replaced mutex with atomic Int32 calls wrapped in helper functions

Signed-off-by: Oliver Calder <[email protected]>

* epoll: added handling for waiting with no registered FDs

It may be the case that one wishes to wait for epoll events without
first registering any file descriptors to which to listen.  Perhaps
those file descriptors will be registered later by another thread, and
we want to capture any activity on them as soon as that occurs.  In any
case, we do not wish to deny the ability to call `Wait()` if there are
no registered FDs.

However, the `EpollWait` syscall requires a nonzero-length buffer to be
passed in (technically a nonzero length value, along with a pointer to
the buffer). Thus, in order to allow future registered FDs to be handled
by an existing wait call which was initiated when no FDs were
registered, it is necessary to ensure that a buffer of at least length 1
is passed into the `EpollWait` syscall.

Unit tests were added to check for correct waiting behavior when no FDs
are registered, when waiting is initiated before FD is registered, and
when the final FD is deregistered after wait has been initiated.

The `TestWaitThenRegister` function was rewritten to be more salient. In
particular, it now tests behavior when no FDs are registered at the time
`Wait()` is called, and then a new FD is registered and activity occurs
on it.

Signed-off-by: Oliver Calder <[email protected]>

* epoll: added tests for {,de}registering bad file descriptors

Signed-off-by: Oliver Calder <[email protected]>

* epoll: improved the arbitrary nonexistent FD used by {,de}register tests

Signed-off-by: Oliver Calder <[email protected]>

* epoll: cleaned up increment/decrement order on error, and unneeded `runtime.KeepAlive()`

Signed-off-by: Oliver Calder <[email protected]>

* epoll: added early return if epoll_wait returned 0 events

Signed-off-by: Oliver Calder <[email protected]>

* epoll: clean up opened sockets, and ensure Assert not called before reading from channels

Signed-off-by: Oliver Calder <[email protected]>

* epoll: use proper checker methods when applicable

Signed-off-by: Oliver Calder <[email protected]>

* epoll: Add comment about minimum buffer size for epoll_wait

Signed-off-by: Oliver Calder <[email protected]>

* epoll: added unit test for EINTR handling

Signed-off-by: Oliver Calder <[email protected]>

* epoll: reduced test sleep and timeout durations to make unit tests faster

Signed-off-by: Oliver Calder <[email protected]>

* epoll: defined `defaultDuration` for use in timing-sensitive unit tests

Signed-off-by: Oliver Calder <[email protected]>

* epoll: replaced `Errno` magic numbers with defined values

Signed-off-by: Oliver Calder <[email protected]>

---------

Signed-off-by: Oliver Calder <[email protected]>
Signed-off-by: Zygmunt Krynicki <[email protected]>
Co-authored-by: Zygmunt Krynicki <[email protected]>
  • Loading branch information
olivercalder and zyga authored Jul 25, 2023
1 parent 4161771 commit 4d01b69
Show file tree
Hide file tree
Showing 4 changed files with 795 additions and 0 deletions.
1 change: 1 addition & 0 deletions .mailmap
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
Sergio Cazzolato <[email protected]> sergio-j-cazzolato <[email protected]>
John R. Lenton <[email protected]> John Lenton <[email protected]>
Zygmunt Krynicki <[email protected]> Zygmunt Krynicki <[email protected]>
213 changes: 213 additions & 0 deletions osutil/epoll/epoll.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Package epoll contains a thin wrapper around the epoll(7) facility.
//
// Using epoll from Go is unusual as the language provides facilities that
// normally make using it directly pointless. Epoll is strictly required for
// unusual kernel interfaces that use event notification but don't implement
// file descriptors that provide usual read/write semantics.

package epoll

import (
"fmt"
"runtime"
"strings"
"sync/atomic"
"time"

"golang.org/x/sys/unix"
)

// Readiness is the bit mask of aspects of readiness to monitor with epoll.
type Readiness int

const (
// Readable indicates readiness for reading (EPOLLIN).
Readable Readiness = unix.EPOLLIN
// Writable indicates readiness for writing (EPOLLOUT).
Writable Readiness = unix.EPOLLOUT
)

// String returns readable representation of the readiness flags.
func (r Readiness) String() string {
frags := make([]string, 0, 2)
if r&Readable != 0 {
frags = append(frags, "Readable")
}
if r&Writable != 0 {
frags = append(frags, "Writable")
}
return strings.Join(frags, "|")
}

// Epoll wraps a file descriptor which can be used for I/O readiness notification.
type Epoll struct {
fd int
registeredFdCount int32 // read/modify using helper functions
}

// Open opens an event monitoring descriptor.
func Open() (*Epoll, error) {
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, fmt.Errorf("cannot open epoll file descriptor: %w", err)
}
e := &Epoll{
fd: fd,
registeredFdCount: 0,
}
runtime.SetFinalizer(e, func(e *Epoll) {
if e.fd != -1 {
e.Close()
}
})
return e, nil
}

// Close closes the event monitoring descriptor.
func (e *Epoll) Close() error {
runtime.SetFinalizer(e, nil)
fd := e.fd
e.fd = -1
e.zeroRegisteredFdCount()
return unix.Close(fd)
}

// Fd returns the integer unix file descriptor referencing the open file.
func (e *Epoll) Fd() int {
return e.fd
}

// RegisteredFdCount returns the number of file descriptors which are currently
// registered to the epoll instance.
func (e *Epoll) RegisteredFdCount() int {
return int(atomic.LoadInt32(&e.registeredFdCount))
}

func (e *Epoll) incrementRegisteredFdCount() {
atomic.AddInt32(&e.registeredFdCount, 1)
}

func (e *Epoll) decrementRegisteredFdCount() {
atomic.AddInt32(&e.registeredFdCount, -1)
}

func (e *Epoll) zeroRegisteredFdCount() {
atomic.StoreInt32(&e.registeredFdCount, 0)
}

// Register registers a file descriptor and allows observing speicifc I/O readiness events.
//
// Please refer to epoll_ctl(2) and EPOLL_CTL_ADD for details.
func (e *Epoll) Register(fd int, mask Readiness) error {
e.incrementRegisteredFdCount()
err := unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, fd, &unix.EpollEvent{
Events: uint32(mask),
Fd: int32(fd),
})
if err != nil {
e.decrementRegisteredFdCount()
return err
}
runtime.KeepAlive(e)
return err
}

// Deregister removes the given file descriptor from the epoll instance.
//
// Please refer to epoll_ctl(2) and EPOLL_CTL_DEL for details.
func (e *Epoll) Deregister(fd int) error {
err := unix.EpollCtl(e.fd, unix.EPOLL_CTL_DEL, fd, &unix.EpollEvent{})
if err != nil {
return err
}
e.decrementRegisteredFdCount()
return err
}

// Modify changes the set of monitored I/O readiness events of a previously registered file descriptor.
//
// Please refer to epoll_ctl(2) and EPOLL_CTL_MOD for details.
func (e *Epoll) Modify(fd int, mask Readiness) error {
err := unix.EpollCtl(e.fd, unix.EPOLL_CTL_MOD, fd, &unix.EpollEvent{
Events: uint32(mask),
Fd: int32(fd),
})
runtime.KeepAlive(e)
return err
}

// Event describes an IO readiness event on a specific file descriptor.
type Event struct {
Fd int
Readiness Readiness
}

var unixEpollWait = unix.EpollWait

// WaitTimeout blocks and waits with the given timeout for arrival of events on any of the added file descriptors.
//
// A msec value of -1 disables timeout.
//
// Please refer to epoll_wait(2) and EPOLL_WAIT for details.
//
// Warning, using epoll from Golang explicitly is tricky.
func (e *Epoll) WaitTimeout(duration time.Duration) ([]Event, error) {
msec := int(duration.Milliseconds())
if duration < 0 {
msec = -1
}
n := 0
var err error
var sysEvents []unix.EpollEvent
for {
bufLen := e.RegisteredFdCount()
if bufLen < 1 {
// Even if RegisteredFdCount is zero, it could increase after a
// call in a multi-threaded environment. This ensures that there
// is at least one entry available in the event buffer. The size
// of the buffer does not need to match the number of events, and
// the syscall will populate as many buffer entries as are
// available, up to the number of epoll events which have yet to
// be handled.
bufLen = 1
}
sysEvents = make([]unix.EpollEvent, bufLen)
startTs := time.Now()
n, err = unixEpollWait(e.fd, sysEvents, msec)
runtime.KeepAlive(e)
// unix.EpollWait can return unix.EINTR, which we want to handle by
// adjusting the timeout (if necessary) and restarting the syscall
if err == nil {
break
} else if err != unix.EINTR {
return nil, err
}
if msec == -1 {
continue
}
elapsed := time.Since(startTs)
msec -= int(elapsed.Milliseconds())
if msec <= 0 {
n = 0
break
}
}
if n == 0 {
return nil, nil
}
events := make([]Event, 0, n)
for i := 0; i < n; i++ {
event := Event{
Fd: int(sysEvents[i].Fd),
Readiness: Readiness(sysEvents[i].Events),
}
events = append(events, event)
}
return events, nil
}

// Wait blocks and waits for arrival of events on any of the added file descriptors.
func (e *Epoll) Wait() ([]Event, error) {
duration := time.Duration(-1)
return e.WaitTimeout(duration)
}
Loading

0 comments on commit 4d01b69

Please sign in to comment.