From 91ad4fb77b05cf5b4a413d2b4b67aa7dee6e9f60 Mon Sep 17 00:00:00 2001 From: Aaron Jheng Date: Sun, 10 Nov 2024 12:10:43 +0800 Subject: [PATCH 01/20] Specify a custom dial function per config (#1527) Specify a custom dial function per config instead of using RegisterDialContext. --- connector.go | 31 ++++++++++++++++++------------- dsn.go | 2 ++ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/connector.go b/connector.go index 62012dba..769b3adc 100644 --- a/connector.go +++ b/connector.go @@ -87,20 +87,25 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { mc.parseTime = mc.cfg.ParseTime // Connect to Server - dialsLock.RLock() - dial, ok := dials[mc.cfg.Net] - dialsLock.RUnlock() - if ok { - dctx := ctx - if mc.cfg.Timeout > 0 { - var cancel context.CancelFunc - dctx, cancel = context.WithTimeout(ctx, c.cfg.Timeout) - defer cancel() - } - mc.netConn, err = dial(dctx, mc.cfg.Addr) + dctx := ctx + if mc.cfg.Timeout > 0 { + var cancel context.CancelFunc + dctx, cancel = context.WithTimeout(ctx, c.cfg.Timeout) + defer cancel() + } + + if c.cfg.DialFunc != nil { + mc.netConn, err = c.cfg.DialFunc(dctx, mc.cfg.Net, mc.cfg.Addr) } else { - nd := net.Dialer{Timeout: mc.cfg.Timeout} - mc.netConn, err = nd.DialContext(ctx, mc.cfg.Net, mc.cfg.Addr) + dialsLock.RLock() + dial, ok := dials[mc.cfg.Net] + dialsLock.RUnlock() + if ok { + mc.netConn, err = dial(dctx, mc.cfg.Addr) + } else { + nd := net.Dialer{} + mc.netConn, err = nd.DialContext(dctx, mc.cfg.Net, mc.cfg.Addr) + } } if err != nil { return nil, err diff --git a/dsn.go b/dsn.go index 3c7a6e21..f391a8fc 100644 --- a/dsn.go +++ b/dsn.go @@ -55,6 +55,8 @@ type Config struct { ReadTimeout time.Duration // I/O read timeout WriteTimeout time.Duration // I/O write timeout Logger Logger // Logger + // DialFunc specifies the dial function for creating connections + DialFunc func(ctx context.Context, network, addr string) (net.Conn, error) // boolean fields From fc64d3f08fb84395f911a6a23a266db92ac8a7e1 Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Mon, 11 Nov 2024 11:14:04 +0900 Subject: [PATCH 02/20] ci: update Go and staticcheck versions (#1639) - Add Go 1.23 support - Remove Go 1.20 support - Update staticcheck action --- .github/workflows/test.yml | 8 +++----- README.md | 2 +- go.mod | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index df37eab5..b1c1f2b3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,9 +15,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: dominikh/staticcheck-action@v1.3.0 - with: - version: "2023.1.6" + - uses: dominikh/staticcheck-action@v1.3.1 list: runs-on: ubuntu-latest @@ -31,10 +29,10 @@ jobs: import os go = [ # Keep the most recent production release at the top - '1.22', + '1.23', # Older production releases + '1.22', '1.21', - '1.20', ] mysql = [ '9.0', diff --git a/README.md b/README.md index c83f4f74..e9d9222b 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ A MySQL-Driver for Go's [database/sql](https://golang.org/pkg/database/sql/) pac ## Requirements -* Go 1.20 or higher. We aim to support the 3 latest versions of Go. +* Go 1.21 or higher. We aim to support the 3 latest versions of Go. * MySQL (5.7+) and MariaDB (10.5+) are supported. * [TiDB](https://github.com/pingcap/tidb) is supported by PingCAP. * Do not ask questions about TiDB in our issue tracker or forum. diff --git a/go.mod b/go.mod index 2eed53eb..33c4dd5b 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/go-sql-driver/mysql -go 1.20 +go 1.21 require filippo.io/edwards25519 v1.1.0 From f62f523d2458d82587f03e9357396a9c8a93fcba Mon Sep 17 00:00:00 2001 From: KratkyZobak Date: Mon, 11 Nov 2024 03:14:49 +0100 Subject: [PATCH 03/20] Fix auth errors when username/password are too long (#1482) (#1625) --- AUTHORS | 1 + packets.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/AUTHORS b/AUTHORS index 287176fb..a9860850 100644 --- a/AUTHORS +++ b/AUTHORS @@ -51,6 +51,7 @@ ICHINOSE Shogo Ilia Cimpoes INADA Naoki Jacek Szwec +Jakub Adamus James Harr Janek Vedock Jason Ng diff --git a/packets.go b/packets.go index 014a1dee..eb4e0cef 100644 --- a/packets.go +++ b/packets.go @@ -392,7 +392,7 @@ func (mc *mysqlConn) writeHandshakeResponsePacket(authResp []byte, plugin string // http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchResponse func (mc *mysqlConn) writeAuthSwitchPacket(authData []byte) error { pktLen := 4 + len(authData) - data, err := mc.buf.takeSmallBuffer(pktLen) + data, err := mc.buf.takeBuffer(pktLen) if err != nil { // cannot take the buffer. Something must be wrong with the connection mc.log(err) From 41a5fa29f2f73060c426547f762dc49b62e1f2a5 Mon Sep 17 00:00:00 2001 From: raffertyyu Date: Tue, 19 Nov 2024 12:09:49 +0800 Subject: [PATCH 04/20] Check if MySQL supports CLIENT_CONNECT_ATTRS before sending client attributes. (#1640) --- packets.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/packets.go b/packets.go index eb4e0cef..a2e7ef95 100644 --- a/packets.go +++ b/packets.go @@ -210,10 +210,13 @@ func (mc *mysqlConn) readHandshakePacket() (data []byte, plugin string, err erro if len(data) > pos { // character set [1 byte] // status flags [2 bytes] + pos += 3 // capability flags (upper 2 bytes) [2 bytes] + mc.flags |= clientFlag(binary.LittleEndian.Uint16(data[pos:pos+2])) << 16 + pos += 2 // length of auth-plugin-data [1 byte] // reserved (all [00]) [10 bytes] - pos += 1 + 2 + 2 + 1 + 10 + pos += 11 // second part of the password cipher [minimum 13 bytes], // where len=MAX(13, length of auth-plugin-data - 8) @@ -261,9 +264,11 @@ func (mc *mysqlConn) writeHandshakeResponsePacket(authResp []byte, plugin string clientLocalFiles | clientPluginAuth | clientMultiResults | - clientConnectAttrs | + mc.flags&clientConnectAttrs | mc.flags&clientLongFlag + sendConnectAttrs := mc.flags&clientConnectAttrs != 0 + if mc.cfg.ClientFoundRows { clientFlags |= clientFoundRows } @@ -296,10 +301,13 @@ func (mc *mysqlConn) writeHandshakeResponsePacket(authResp []byte, plugin string } // encode length of the connection attributes - var connAttrsLEIBuf [9]byte - connAttrsLen := len(mc.connector.encodedAttributes) - connAttrsLEI := appendLengthEncodedInteger(connAttrsLEIBuf[:0], uint64(connAttrsLen)) - pktLen += len(connAttrsLEI) + len(mc.connector.encodedAttributes) + var connAttrsLEI []byte + if sendConnectAttrs { + var connAttrsLEIBuf [9]byte + connAttrsLen := len(mc.connector.encodedAttributes) + connAttrsLEI = appendLengthEncodedInteger(connAttrsLEIBuf[:0], uint64(connAttrsLen)) + pktLen += len(connAttrsLEI) + len(mc.connector.encodedAttributes) + } // Calculate packet length and get buffer with that size data, err := mc.buf.takeBuffer(pktLen + 4) @@ -382,8 +390,10 @@ func (mc *mysqlConn) writeHandshakeResponsePacket(authResp []byte, plugin string pos++ // Connection Attributes - pos += copy(data[pos:], connAttrsLEI) - pos += copy(data[pos:], []byte(mc.connector.encodedAttributes)) + if sendConnectAttrs { + pos += copy(data[pos:], connAttrsLEI) + pos += copy(data[pos:], []byte(mc.connector.encodedAttributes)) + } // Send Auth packet return mc.writePacket(data[:pos]) From 9c8d6a5ddc5b4c2a658e77cb4d03583327901ca5 Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Fri, 22 Nov 2024 00:48:25 +0900 Subject: [PATCH 05/20] Reduce "busy buffer" logs (#1641) Reduce the use of `errBadConnNoWrite` to improve maintainability. ResetSession() and IsValid() checks if the buffer is busy. This reduces the risk of busy buffer error during connection in use. In principle, the risk of this is zero. So I removed errBadConnNoWrite when checking the busy buffer. After this change, only `writePacke()` returns errBadConnNoWrite. Additionally, I do not send COM_QUIT when readPacket() encounter read error. It caused "busy buffer" error too and hide real errors. --- buffer.go | 5 +++++ connection.go | 10 +++++++--- packets.go | 44 ++++++++++++++------------------------------ 3 files changed, 26 insertions(+), 33 deletions(-) diff --git a/buffer.go b/buffer.go index 0774c5c8..d3d009cc 100644 --- a/buffer.go +++ b/buffer.go @@ -43,6 +43,11 @@ func newBuffer(nc net.Conn) buffer { } } +// busy returns true if the buffer contains some read data. +func (b *buffer) busy() bool { + return b.length > 0 +} + // flip replaces the active buffer with the background buffer // this is a delayed flip that simply increases the buffer counter; // the actual flip will be performed the next time we call `buffer.fill` diff --git a/connection.go b/connection.go index ef6fc9e4..c220a836 100644 --- a/connection.go +++ b/connection.go @@ -121,10 +121,14 @@ func (mc *mysqlConn) Close() (err error) { if !mc.closed.Load() { err = mc.writeCommandPacket(comQuit) } + mc.close() + return +} +// close closes the network connection and clear results without sending COM_QUIT. +func (mc *mysqlConn) close() { mc.cleanup() mc.clearResult() - return } // Closes the network connection and unsets internal variables. Do not call this @@ -637,7 +641,7 @@ func (mc *mysqlConn) CheckNamedValue(nv *driver.NamedValue) (err error) { // ResetSession implements driver.SessionResetter. // (From Go 1.10) func (mc *mysqlConn) ResetSession(ctx context.Context) error { - if mc.closed.Load() { + if mc.closed.Load() || mc.buf.busy() { return driver.ErrBadConn } @@ -671,7 +675,7 @@ func (mc *mysqlConn) ResetSession(ctx context.Context) error { // IsValid implements driver.Validator interface // (From Go 1.15) func (mc *mysqlConn) IsValid() bool { - return !mc.closed.Load() + return !mc.closed.Load() && !mc.buf.busy() } var _ driver.SessionResetter = &mysqlConn{} diff --git a/packets.go b/packets.go index a2e7ef95..4695fb81 100644 --- a/packets.go +++ b/packets.go @@ -32,11 +32,11 @@ func (mc *mysqlConn) readPacket() ([]byte, error) { // read packet header data, err := mc.buf.readNext(4) if err != nil { + mc.close() if cerr := mc.canceled.Value(); cerr != nil { return nil, cerr } mc.log(err) - mc.Close() return nil, ErrInvalidConn } @@ -45,7 +45,7 @@ func (mc *mysqlConn) readPacket() ([]byte, error) { // check packet sync [8 bit] if data[3] != mc.sequence { - mc.Close() + mc.close() if data[3] > mc.sequence { return nil, ErrPktSyncMul } @@ -59,7 +59,7 @@ func (mc *mysqlConn) readPacket() ([]byte, error) { // there was no previous packet if prevData == nil { mc.log(ErrMalformPkt) - mc.Close() + mc.close() return nil, ErrInvalidConn } @@ -69,11 +69,11 @@ func (mc *mysqlConn) readPacket() ([]byte, error) { // read packet body [pktLen bytes] data, err = mc.buf.readNext(pktLen) if err != nil { + mc.close() if cerr := mc.canceled.Value(); cerr != nil { return nil, cerr } mc.log(err) - mc.Close() return nil, ErrInvalidConn } @@ -125,10 +125,10 @@ func (mc *mysqlConn) writePacket(data []byte) error { n, err := mc.netConn.Write(data[:4+size]) if err != nil { + mc.cleanup() if cerr := mc.canceled.Value(); cerr != nil { return cerr } - mc.cleanup() if n == 0 && pktLen == len(data)-4 { // only for the first loop iteration when nothing was written yet mc.log(err) @@ -162,11 +162,6 @@ func (mc *mysqlConn) writePacket(data []byte) error { func (mc *mysqlConn) readHandshakePacket() (data []byte, plugin string, err error) { data, err = mc.readPacket() if err != nil { - // for init we can rewrite this to ErrBadConn for sql.Driver to retry, since - // in connection initialization we don't risk retrying non-idempotent actions. - if err == ErrInvalidConn { - return nil, "", driver.ErrBadConn - } return } @@ -312,9 +307,8 @@ func (mc *mysqlConn) writeHandshakeResponsePacket(authResp []byte, plugin string // Calculate packet length and get buffer with that size data, err := mc.buf.takeBuffer(pktLen + 4) if err != nil { - // cannot take the buffer. Something must be wrong with the connection - mc.log(err) - return errBadConnNoWrite + mc.cleanup() + return err } // ClientFlags [32 bit] @@ -404,9 +398,8 @@ func (mc *mysqlConn) writeAuthSwitchPacket(authData []byte) error { pktLen := 4 + len(authData) data, err := mc.buf.takeBuffer(pktLen) if err != nil { - // cannot take the buffer. Something must be wrong with the connection - mc.log(err) - return errBadConnNoWrite + mc.cleanup() + return err } // Add the auth data [EOF] @@ -424,9 +417,7 @@ func (mc *mysqlConn) writeCommandPacket(command byte) error { data, err := mc.buf.takeSmallBuffer(4 + 1) if err != nil { - // cannot take the buffer. Something must be wrong with the connection - mc.log(err) - return errBadConnNoWrite + return err } // Add command byte @@ -443,9 +434,7 @@ func (mc *mysqlConn) writeCommandPacketStr(command byte, arg string) error { pktLen := 1 + len(arg) data, err := mc.buf.takeBuffer(pktLen + 4) if err != nil { - // cannot take the buffer. Something must be wrong with the connection - mc.log(err) - return errBadConnNoWrite + return err } // Add command byte @@ -464,9 +453,7 @@ func (mc *mysqlConn) writeCommandPacketUint32(command byte, arg uint32) error { data, err := mc.buf.takeSmallBuffer(4 + 1 + 4) if err != nil { - // cannot take the buffer. Something must be wrong with the connection - mc.log(err) - return errBadConnNoWrite + return err } // Add command byte @@ -1007,9 +994,7 @@ func (stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error { // In this case the len(data) == cap(data) which is used to optimise the flow below. } if err != nil { - // cannot take the buffer. Something must be wrong with the connection - mc.log(err) - return errBadConnNoWrite + return err } // command [1 byte] @@ -1207,8 +1192,7 @@ func (stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error { if valuesCap != cap(paramValues) { data = append(data[:pos], paramValues...) if err = mc.buf.store(data); err != nil { - mc.log(err) - return errBadConnNoWrite + return err } } From 2df7a26b03e5f9a55bc31544bc9240ac5705e235 Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Wed, 27 Nov 2024 12:41:28 +0900 Subject: [PATCH 06/20] stmt.Close() returns nil when double close (#1642) ErrBadConn needs special care to ensure it is safe to retry. To improve maintenance, I don't want to use the error where I don't have to. Additionally, update the old comment about Go's bug that had been fixed long time ago. --- statement.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/statement.go b/statement.go index 35b02bbe..35df8545 100644 --- a/statement.go +++ b/statement.go @@ -24,11 +24,12 @@ type mysqlStmt struct { func (stmt *mysqlStmt) Close() error { if stmt.mc == nil || stmt.mc.closed.Load() { - // driver.Stmt.Close can be called more than once, thus this function - // has to be idempotent. - // See also Issue #450 and golang/go#16019. - //errLog.Print(ErrInvalidConn) - return driver.ErrBadConn + // driver.Stmt.Close could be called more than once, thus this function + // had to be idempotent. See also Issue #450 and golang/go#16019. + // This bug has been fixed in Go 1.8. + // https://github.com/golang/go/commit/90b8a0ca2d0b565c7c7199ffcf77b15ea6b6db3a + // But we keep this function idempotent because it is safer. + return nil } err := stmt.mc.writeCommandPacketUint32(comStmtClose, stmt.id) From 575e1b288d624fb14bf56532689f3ec1c1989149 Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Sun, 1 Dec 2024 10:08:42 +0900 Subject: [PATCH 07/20] stop double-buffering (#1643) Since we dropped Go 1.20 support, we do not need double buffering. This pull request stop double buffering and simplify buffer implementation a lot. Fix #1435 --- buffer.go | 118 ++++++++++++++++++++--------------------------------- packets.go | 4 +- rows.go | 7 ---- 3 files changed, 45 insertions(+), 84 deletions(-) diff --git a/buffer.go b/buffer.go index d3d009cc..dd82c931 100644 --- a/buffer.go +++ b/buffer.go @@ -22,47 +22,30 @@ const maxCachedBufSize = 256 * 1024 // In other words, we can't write and read simultaneously on the same connection. // The buffer is similar to bufio.Reader / Writer but zero-copy-ish // Also highly optimized for this particular use case. -// This buffer is backed by two byte slices in a double-buffering scheme type buffer struct { - buf []byte // buf is a byte buffer who's length and capacity are equal. - nc net.Conn - idx int - length int - timeout time.Duration - dbuf [2][]byte // dbuf is an array with the two byte slices that back this buffer - flipcnt uint // flipccnt is the current buffer counter for double-buffering + buf []byte // read buffer. + cachedBuf []byte // buffer that will be reused. len(cachedBuf) <= maxCachedBufSize. + nc net.Conn + timeout time.Duration } // newBuffer allocates and returns a new buffer. func newBuffer(nc net.Conn) buffer { - fg := make([]byte, defaultBufSize) return buffer{ - buf: fg, - nc: nc, - dbuf: [2][]byte{fg, nil}, + cachedBuf: make([]byte, defaultBufSize), + nc: nc, } } -// busy returns true if the buffer contains some read data. +// busy returns true if the read buffer is not empty. func (b *buffer) busy() bool { - return b.length > 0 + return len(b.buf) > 0 } -// flip replaces the active buffer with the background buffer -// this is a delayed flip that simply increases the buffer counter; -// the actual flip will be performed the next time we call `buffer.fill` -func (b *buffer) flip() { - b.flipcnt += 1 -} - -// fill reads into the buffer until at least _need_ bytes are in it +// fill reads into the read buffer until at least _need_ bytes are in it. func (b *buffer) fill(need int) error { - n := b.length - // fill data into its double-buffering target: if we've called - // flip on this buffer, we'll be copying to the background buffer, - // and then filling it with network data; otherwise we'll just move - // the contents of the current buffer to the front before filling it - dest := b.dbuf[b.flipcnt&1] + // we'll move the contents of the current buffer to dest before filling it. + dest := b.cachedBuf // grow buffer if necessary to fit the whole packet. if need > len(dest) { @@ -72,18 +55,13 @@ func (b *buffer) fill(need int) error { // if the allocated buffer is not too large, move it to backing storage // to prevent extra allocations on applications that perform large reads if len(dest) <= maxCachedBufSize { - b.dbuf[b.flipcnt&1] = dest + b.cachedBuf = dest } } - // if we're filling the fg buffer, move the existing data to the start of it. - // if we're filling the bg buffer, copy over the data - if n > 0 { - copy(dest[:n], b.buf[b.idx:]) - } - - b.buf = dest - b.idx = 0 + // move the existing data to the start of the buffer. + n := len(b.buf) + copy(dest[:n], b.buf) for { if b.timeout > 0 { @@ -92,44 +70,39 @@ func (b *buffer) fill(need int) error { } } - nn, err := b.nc.Read(b.buf[n:]) + nn, err := b.nc.Read(dest[n:]) n += nn - switch err { - case nil: - if n < need { - continue - } - b.length = n - return nil + if err == nil && n < need { + continue + } - case io.EOF: - if n >= need { - b.length = n - return nil - } - return io.ErrUnexpectedEOF + b.buf = dest[:n] - default: - return err + if err == io.EOF { + if n < need { + err = io.ErrUnexpectedEOF + } else { + err = nil + } } + return err } } // returns next N bytes from buffer. // The returned slice is only guaranteed to be valid until the next read func (b *buffer) readNext(need int) ([]byte, error) { - if b.length < need { + if len(b.buf) < need { // refill if err := b.fill(need); err != nil { return nil, err } } - offset := b.idx - b.idx += need - b.length -= need - return b.buf[offset:b.idx], nil + data := b.buf[:need] + b.buf = b.buf[need:] + return data, nil } // takeBuffer returns a buffer with the requested size. @@ -137,18 +110,18 @@ func (b *buffer) readNext(need int) ([]byte, error) { // Otherwise a bigger buffer is made. // Only one buffer (total) can be used at a time. func (b *buffer) takeBuffer(length int) ([]byte, error) { - if b.length > 0 { + if b.busy() { return nil, ErrBusyBuffer } // test (cheap) general case first - if length <= cap(b.buf) { - return b.buf[:length], nil + if length <= len(b.cachedBuf) { + return b.cachedBuf[:length], nil } - if length < maxPacketSize { - b.buf = make([]byte, length) - return b.buf, nil + if length < maxCachedBufSize { + b.cachedBuf = make([]byte, length) + return b.cachedBuf, nil } // buffer is larger than we want to store. @@ -159,10 +132,10 @@ func (b *buffer) takeBuffer(length int) ([]byte, error) { // known to be smaller than defaultBufSize. // Only one buffer (total) can be used at a time. func (b *buffer) takeSmallBuffer(length int) ([]byte, error) { - if b.length > 0 { + if b.busy() { return nil, ErrBusyBuffer } - return b.buf[:length], nil + return b.cachedBuf[:length], nil } // takeCompleteBuffer returns the complete existing buffer. @@ -170,18 +143,15 @@ func (b *buffer) takeSmallBuffer(length int) ([]byte, error) { // cap and len of the returned buffer will be equal. // Only one buffer (total) can be used at a time. func (b *buffer) takeCompleteBuffer() ([]byte, error) { - if b.length > 0 { + if b.busy() { return nil, ErrBusyBuffer } - return b.buf, nil + return b.cachedBuf, nil } // store stores buf, an updated buffer, if its suitable to do so. -func (b *buffer) store(buf []byte) error { - if b.length > 0 { - return ErrBusyBuffer - } else if cap(buf) <= maxPacketSize && cap(buf) > cap(b.buf) { - b.buf = buf[:cap(buf)] +func (b *buffer) store(buf []byte) { + if cap(buf) <= maxCachedBufSize && cap(buf) > cap(b.cachedBuf) { + b.cachedBuf = buf[:cap(buf)] } - return nil } diff --git a/packets.go b/packets.go index 4695fb81..736e4418 100644 --- a/packets.go +++ b/packets.go @@ -1191,9 +1191,7 @@ func (stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error { // In that case we must build the data packet with the new values buffer if valuesCap != cap(paramValues) { data = append(data[:pos], paramValues...) - if err = mc.buf.store(data); err != nil { - return err - } + mc.buf.store(data) // allow this buffer to be reused } pos += len(paramValues) diff --git a/rows.go b/rows.go index 81fa6062..df98417b 100644 --- a/rows.go +++ b/rows.go @@ -111,13 +111,6 @@ func (rows *mysqlRows) Close() (err error) { return err } - // flip the buffer for this connection if we need to drain it. - // note that for a successful query (i.e. one where rows.next() - // has been called until it returns false), `rows.mc` will be nil - // by the time the user calls `(*Rows).Close`, so we won't reach this - // see: https://github.com/golang/go/commit/651ddbdb5056ded455f47f9c494c67b389622a47 - mc.buf.flip() - // Remove unread packets from stream if !rows.rs.done { err = mc.readUntilEOF() From c9f41c074062d5ab9aeb5e44adeac3a7d85fbc4e Mon Sep 17 00:00:00 2001 From: Minh Quang Date: Sun, 15 Dec 2024 10:37:13 +0700 Subject: [PATCH 08/20] fix typo in comment (#1647) Fix #1646 --- AUTHORS | 1 + connection.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/AUTHORS b/AUTHORS index a9860850..361c6b64 100644 --- a/AUTHORS +++ b/AUTHORS @@ -92,6 +92,7 @@ Paul Bonser Paulius Lozys Peter Schultz Phil Porada +Minh Quang Rebecca Chin Reed Allman Richard Wilkes diff --git a/connection.go b/connection.go index c220a836..acc62708 100644 --- a/connection.go +++ b/connection.go @@ -435,7 +435,7 @@ func (mc *mysqlConn) getSystemVar(name string) ([]byte, error) { return nil, err } -// finish is called when the query has canceled. +// cancel is called when the query has canceled. func (mc *mysqlConn) cancel(err error) { mc.canceled.Set(err) mc.cleanup() From 3348e573da4c1d7186ae7d6eabd4d7333bd486a0 Mon Sep 17 00:00:00 2001 From: Joe Mann Date: Thu, 19 Dec 2024 04:14:14 +0100 Subject: [PATCH 09/20] Implement zlib compression (#1487) Implemented the SQL compression protocol. This new feature is enabled by: * Adding `compress=true` in DSN. * `cfg.Apply(Compress(True))` Co-authored-by: Brigitte Lamarche Co-authored-by: Julien Schmidt Co-authored-by: Jeffrey Charles Co-authored-by: Jeff Hodges Co-authored-by: Daniel Montoya Co-authored-by: Justin Li Co-authored-by: Dave Stubbs Co-authored-by: Linh Tran Tuan Co-authored-by: Robert R. Russell Co-authored-by: INADA Naoki Co-authored-by: Kieron Woodhouse Co-authored-by: Alexey Palazhchenko Co-authored-by: Reed Allman Co-authored-by: Joe Mann --- .github/workflows/test.yml | 2 +- AUTHORS | 2 + README.md | 11 ++ benchmark_test.go | 28 +++-- buffer.go | 26 ++--- compress.go | 214 +++++++++++++++++++++++++++++++++++++ compress_test.go | 119 +++++++++++++++++++++ connection.go | 43 +++++++- connection_test.go | 14 +-- connector.go | 10 +- const.go | 2 + driver_test.go | 45 ++++++-- dsn.go | 24 ++++- infile.go | 1 + packets.go | 113 ++++++++++++-------- packets_test.go | 24 +++-- utils.go | 12 +++ 17 files changed, 581 insertions(+), 109 deletions(-) create mode 100644 compress.go create mode 100644 compress_test.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b1c1f2b3..2e07fea9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -83,7 +83,7 @@ jobs: my-cnf: | innodb_log_file_size=256MB innodb_buffer_pool_size=512MB - max_allowed_packet=16MB + max_allowed_packet=48MB ; TestConcurrent fails if max_connections is too large max_connections=50 local_infile=1 diff --git a/AUTHORS b/AUTHORS index 361c6b64..cbcc90f5 100644 --- a/AUTHORS +++ b/AUTHORS @@ -21,6 +21,7 @@ Animesh Ray Arne Hormann Ariel Mashraki Asta Xie +B Lamarche Brian Hendriks Bulat Gaifullin Caine Jette @@ -62,6 +63,7 @@ Jennifer Purevsuren Jerome Meyer Jiajia Zhong Jian Zhen +Joe Mann Joshua Prunier Julien Lefevre Julien Schmidt diff --git a/README.md b/README.md index e9d9222b..da4593cc 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ A MySQL-Driver for Go's [database/sql](https://golang.org/pkg/database/sql/) pac * Secure `LOAD DATA LOCAL INFILE` support with file allowlisting and `io.Reader` support * Optional `time.Time` parsing * Optional placeholder interpolation + * Supports zlib compression. ## Requirements @@ -267,6 +268,16 @@ SELECT u.id FROM users as u will return `u.id` instead of just `id` if `columnsWithAlias=true`. +##### `compress` + +``` +Type: bool +Valid Values: true, false +Default: false +``` + +Toggles zlib compression. false by default. + ##### `interpolateParams` ``` diff --git a/benchmark_test.go b/benchmark_test.go index a4ecc0a6..5c9a046b 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -46,9 +46,13 @@ func (tb *TB) checkStmt(stmt *sql.Stmt, err error) *sql.Stmt { return stmt } -func initDB(b *testing.B, queries ...string) *sql.DB { +func initDB(b *testing.B, useCompression bool, queries ...string) *sql.DB { tb := (*TB)(b) - db := tb.checkDB(sql.Open(driverNameTest, dsn)) + comprStr := "" + if useCompression { + comprStr = "&compress=1" + } + db := tb.checkDB(sql.Open(driverNameTest, dsn+comprStr)) for _, query := range queries { if _, err := db.Exec(query); err != nil { b.Fatalf("error on %q: %v", query, err) @@ -60,10 +64,18 @@ func initDB(b *testing.B, queries ...string) *sql.DB { const concurrencyLevel = 10 func BenchmarkQuery(b *testing.B) { + benchmarkQueryHelper(b, false) +} + +func BenchmarkQueryCompression(b *testing.B) { + benchmarkQueryHelper(b, true) +} + +func benchmarkQueryHelper(b *testing.B, compr bool) { tb := (*TB)(b) b.StopTimer() b.ReportAllocs() - db := initDB(b, + db := initDB(b, compr, "DROP TABLE IF EXISTS foo", "CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))", `INSERT INTO foo VALUES (1, "one")`, @@ -222,7 +234,7 @@ func BenchmarkInterpolation(b *testing.B) { }, maxAllowedPacket: maxPacketSize, maxWriteSize: maxPacketSize - 1, - buf: newBuffer(nil), + buf: newBuffer(), } args := []driver.Value{ @@ -269,7 +281,7 @@ func benchmarkQueryContext(b *testing.B, db *sql.DB, p int) { } func BenchmarkQueryContext(b *testing.B) { - db := initDB(b, + db := initDB(b, false, "DROP TABLE IF EXISTS foo", "CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))", `INSERT INTO foo VALUES (1, "one")`, @@ -305,7 +317,7 @@ func benchmarkExecContext(b *testing.B, db *sql.DB, p int) { } func BenchmarkExecContext(b *testing.B) { - db := initDB(b, + db := initDB(b, false, "DROP TABLE IF EXISTS foo", "CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))", `INSERT INTO foo VALUES (1, "one")`, @@ -323,7 +335,7 @@ func BenchmarkExecContext(b *testing.B) { // "size=" means size of each blobs. func BenchmarkQueryRawBytes(b *testing.B) { var sizes []int = []int{100, 1000, 2000, 4000, 8000, 12000, 16000, 32000, 64000, 256000} - db := initDB(b, + db := initDB(b, false, "DROP TABLE IF EXISTS bench_rawbytes", "CREATE TABLE bench_rawbytes (id INT PRIMARY KEY, val LONGBLOB)", ) @@ -376,7 +388,7 @@ func BenchmarkQueryRawBytes(b *testing.B) { // BenchmarkReceiveMassiveRows measures performance of receiving large number of rows. func BenchmarkReceiveMassiveRows(b *testing.B) { // Setup -- prepare 10000 rows. - db := initDB(b, + db := initDB(b, false, "DROP TABLE IF EXISTS foo", "CREATE TABLE foo (id INT PRIMARY KEY, val TEXT)") defer db.Close() diff --git a/buffer.go b/buffer.go index dd82c931..a6532431 100644 --- a/buffer.go +++ b/buffer.go @@ -10,13 +10,16 @@ package mysql import ( "io" - "net" - "time" ) const defaultBufSize = 4096 const maxCachedBufSize = 256 * 1024 +// readerFunc is a function that compatible with io.Reader. +// We use this function type instead of io.Reader because we want to +// just pass mc.readWithTimeout. +type readerFunc func([]byte) (int, error) + // A buffer which is used for both reading and writing. // This is possible since communication on each connection is synchronous. // In other words, we can't write and read simultaneously on the same connection. @@ -25,15 +28,12 @@ const maxCachedBufSize = 256 * 1024 type buffer struct { buf []byte // read buffer. cachedBuf []byte // buffer that will be reused. len(cachedBuf) <= maxCachedBufSize. - nc net.Conn - timeout time.Duration } // newBuffer allocates and returns a new buffer. -func newBuffer(nc net.Conn) buffer { +func newBuffer() buffer { return buffer{ cachedBuf: make([]byte, defaultBufSize), - nc: nc, } } @@ -43,7 +43,7 @@ func (b *buffer) busy() bool { } // fill reads into the read buffer until at least _need_ bytes are in it. -func (b *buffer) fill(need int) error { +func (b *buffer) fill(need int, r readerFunc) error { // we'll move the contents of the current buffer to dest before filling it. dest := b.cachedBuf @@ -64,13 +64,7 @@ func (b *buffer) fill(need int) error { copy(dest[:n], b.buf) for { - if b.timeout > 0 { - if err := b.nc.SetReadDeadline(time.Now().Add(b.timeout)); err != nil { - return err - } - } - - nn, err := b.nc.Read(dest[n:]) + nn, err := r(dest[n:]) n += nn if err == nil && n < need { @@ -92,10 +86,10 @@ func (b *buffer) fill(need int) error { // returns next N bytes from buffer. // The returned slice is only guaranteed to be valid until the next read -func (b *buffer) readNext(need int) ([]byte, error) { +func (b *buffer) readNext(need int, r readerFunc) ([]byte, error) { if len(b.buf) < need { // refill - if err := b.fill(need); err != nil { + if err := b.fill(need, r); err != nil { return nil, err } } diff --git a/compress.go b/compress.go new file mode 100644 index 00000000..fa42772a --- /dev/null +++ b/compress.go @@ -0,0 +1,214 @@ +// Go MySQL Driver - A MySQL-Driver for Go's database/sql package +// +// Copyright 2024 The Go-MySQL-Driver Authors. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package mysql + +import ( + "bytes" + "compress/zlib" + "fmt" + "io" + "sync" +) + +var ( + zrPool *sync.Pool // Do not use directly. Use zDecompress() instead. + zwPool *sync.Pool // Do not use directly. Use zCompress() instead. +) + +func init() { + zrPool = &sync.Pool{ + New: func() any { return nil }, + } + zwPool = &sync.Pool{ + New: func() any { + zw, err := zlib.NewWriterLevel(new(bytes.Buffer), 2) + if err != nil { + panic(err) // compress/zlib return non-nil error only if level is invalid + } + return zw + }, + } +} + +func zDecompress(src []byte, dst *bytes.Buffer) (int, error) { + br := bytes.NewReader(src) + var zr io.ReadCloser + var err error + + if a := zrPool.Get(); a == nil { + if zr, err = zlib.NewReader(br); err != nil { + return 0, err + } + } else { + zr = a.(io.ReadCloser) + if err := zr.(zlib.Resetter).Reset(br, nil); err != nil { + return 0, err + } + } + + n, _ := dst.ReadFrom(zr) // ignore err because zr.Close() will return it again. + err = zr.Close() // zr.Close() may return chuecksum error. + zrPool.Put(zr) + return int(n), err +} + +func zCompress(src []byte, dst io.Writer) error { + zw := zwPool.Get().(*zlib.Writer) + zw.Reset(dst) + if _, err := zw.Write(src); err != nil { + return err + } + err := zw.Close() + zwPool.Put(zw) + return err +} + +type compIO struct { + mc *mysqlConn + buff bytes.Buffer +} + +func newCompIO(mc *mysqlConn) *compIO { + return &compIO{ + mc: mc, + } +} + +func (c *compIO) reset() { + c.buff.Reset() +} + +func (c *compIO) readNext(need int, r readerFunc) ([]byte, error) { + for c.buff.Len() < need { + if err := c.readCompressedPacket(r); err != nil { + return nil, err + } + } + data := c.buff.Next(need) + return data[:need:need], nil // prevent caller writes into c.buff +} + +func (c *compIO) readCompressedPacket(r readerFunc) error { + header, err := c.mc.buf.readNext(7, r) // size of compressed header + if err != nil { + return err + } + _ = header[6] // bounds check hint to compiler; guaranteed by readNext + + // compressed header structure + comprLength := getUint24(header[0:3]) + compressionSequence := uint8(header[3]) + uncompressedLength := getUint24(header[4:7]) + if debug { + fmt.Printf("uncompress cmplen=%v uncomplen=%v pkt_cmp_seq=%v expected_cmp_seq=%v\n", + comprLength, uncompressedLength, compressionSequence, c.mc.sequence) + } + // Do not return ErrPktSync here. + // Server may return error packet (e.g. 1153 Got a packet bigger than 'max_allowed_packet' bytes) + // before receiving all packets from client. In this case, seqnr is younger than expected. + // NOTE: Both of mariadbclient and mysqlclient do not check seqnr. Only server checks it. + if debug && compressionSequence != c.mc.sequence { + fmt.Printf("WARN: unexpected cmpress seq nr: expected %v, got %v", + c.mc.sequence, compressionSequence) + } + c.mc.sequence = compressionSequence + 1 + c.mc.compressSequence = c.mc.sequence + + comprData, err := c.mc.buf.readNext(comprLength, r) + if err != nil { + return err + } + + // if payload is uncompressed, its length will be specified as zero, and its + // true length is contained in comprLength + if uncompressedLength == 0 { + c.buff.Write(comprData) + return nil + } + + // use existing capacity in bytesBuf if possible + c.buff.Grow(uncompressedLength) + nread, err := zDecompress(comprData, &c.buff) + if err != nil { + return err + } + if nread != uncompressedLength { + return fmt.Errorf("invalid compressed packet: uncompressed length in header is %d, actual %d", + uncompressedLength, nread) + } + return nil +} + +const minCompressLength = 150 +const maxPayloadLen = maxPacketSize - 4 + +// writePackets sends one or some packets with compression. +// Use this instead of mc.netConn.Write() when mc.compress is true. +func (c *compIO) writePackets(packets []byte) (int, error) { + totalBytes := len(packets) + blankHeader := make([]byte, 7) + buf := &c.buff + + for len(packets) > 0 { + payloadLen := min(maxPayloadLen, len(packets)) + payload := packets[:payloadLen] + uncompressedLen := payloadLen + + buf.Reset() + buf.Write(blankHeader) // Buffer.Write() never returns error + + // If payload is less than minCompressLength, don't compress. + if uncompressedLen < minCompressLength { + buf.Write(payload) + uncompressedLen = 0 + } else { + err := zCompress(payload, buf) + if debug && err != nil { + fmt.Printf("zCompress error: %v", err) + } + // do not compress if compressed data is larger than uncompressed data + // I intentionally miss 7 byte header in the buf; zCompress must compress more than 7 bytes. + if err != nil || buf.Len() >= uncompressedLen { + buf.Reset() + buf.Write(blankHeader) + buf.Write(payload) + uncompressedLen = 0 + } + } + + if n, err := c.writeCompressedPacket(buf.Bytes(), uncompressedLen); err != nil { + // To allow returning ErrBadConn when sending really 0 bytes, we sum + // up compressed bytes that is returned by underlying Write(). + return totalBytes - len(packets) + n, err + } + packets = packets[payloadLen:] + } + + return totalBytes, nil +} + +// writeCompressedPacket writes a compressed packet with header. +// data should start with 7 size space for header followed by payload. +func (c *compIO) writeCompressedPacket(data []byte, uncompressedLen int) (int, error) { + mc := c.mc + comprLength := len(data) - 7 + if debug { + fmt.Printf( + "writeCompressedPacket: comprLength=%v, uncompressedLen=%v, seq=%v", + comprLength, uncompressedLen, mc.compressSequence) + } + + // compression header + putUint24(data[0:3], comprLength) + data[3] = mc.compressSequence + putUint24(data[4:7], uncompressedLen) + + mc.compressSequence++ + return mc.writeWithTimeout(data) +} diff --git a/compress_test.go b/compress_test.go new file mode 100644 index 00000000..030deaef --- /dev/null +++ b/compress_test.go @@ -0,0 +1,119 @@ +// Go MySQL Driver - A MySQL-Driver for Go's database/sql package +// +// Copyright 2024 The Go-MySQL-Driver Authors. All rights reserved. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package mysql + +import ( + "bytes" + "crypto/rand" + "io" + "testing" +) + +func makeRandByteSlice(size int) []byte { + randBytes := make([]byte, size) + rand.Read(randBytes) + return randBytes +} + +// compressHelper compresses uncompressedPacket and checks state variables +func compressHelper(t *testing.T, mc *mysqlConn, uncompressedPacket []byte) []byte { + conn := new(mockConn) + mc.netConn = conn + + err := mc.writePacket(append(make([]byte, 4), uncompressedPacket...)) + if err != nil { + t.Fatal(err) + } + + return conn.written +} + +// uncompressHelper uncompresses compressedPacket and checks state variables +func uncompressHelper(t *testing.T, mc *mysqlConn, compressedPacket []byte) []byte { + // mocking out buf variable + conn := new(mockConn) + conn.data = compressedPacket + mc.netConn = conn + + uncompressedPacket, err := mc.readPacket() + if err != nil { + if err != io.EOF { + t.Fatalf("non-nil/non-EOF error when reading contents: %s", err.Error()) + } + } + return uncompressedPacket +} + +// roundtripHelper compresses then uncompresses uncompressedPacket and checks state variables +func roundtripHelper(t *testing.T, cSend *mysqlConn, cReceive *mysqlConn, uncompressedPacket []byte) []byte { + compressed := compressHelper(t, cSend, uncompressedPacket) + return uncompressHelper(t, cReceive, compressed) +} + +// TestRoundtrip tests two connections, where one is reading and the other is writing +func TestRoundtrip(t *testing.T) { + tests := []struct { + uncompressed []byte + desc string + }{ + {uncompressed: []byte("a"), + desc: "a"}, + {uncompressed: []byte("hello world"), + desc: "hello world"}, + {uncompressed: make([]byte, 100), + desc: "100 bytes"}, + {uncompressed: make([]byte, 32768), + desc: "32768 bytes"}, + {uncompressed: make([]byte, 330000), + desc: "33000 bytes"}, + {uncompressed: makeRandByteSlice(10), + desc: "10 rand bytes", + }, + {uncompressed: makeRandByteSlice(100), + desc: "100 rand bytes", + }, + {uncompressed: makeRandByteSlice(32768), + desc: "32768 rand bytes", + }, + {uncompressed: bytes.Repeat(makeRandByteSlice(100), 10000), + desc: "100 rand * 10000 repeat bytes", + }, + } + + _, cSend := newRWMockConn(0) + cSend.compress = true + cSend.compIO = newCompIO(cSend) + _, cReceive := newRWMockConn(0) + cReceive.compress = true + cReceive.compIO = newCompIO(cReceive) + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + cSend.resetSequence() + cReceive.resetSequence() + + uncompressed := roundtripHelper(t, cSend, cReceive, test.uncompressed) + if len(uncompressed) != len(test.uncompressed) { + t.Errorf("uncompressed size is unexpected. expected %d but got %d", + len(test.uncompressed), len(uncompressed)) + } + if !bytes.Equal(uncompressed, test.uncompressed) { + t.Errorf("roundtrip failed") + } + if cSend.sequence != cReceive.sequence { + t.Errorf("inconsistent sequence number: send=%v recv=%v", + cSend.sequence, cReceive.sequence) + } + if cSend.compressSequence != cReceive.compressSequence { + t.Errorf("inconsistent compress sequence number: send=%v recv=%v", + cSend.compressSequence, cReceive.compressSequence) + } + }) + } +} diff --git a/connection.go b/connection.go index acc62708..3e455a3f 100644 --- a/connection.go +++ b/connection.go @@ -28,15 +28,17 @@ type mysqlConn struct { netConn net.Conn rawConn net.Conn // underlying connection when netConn is TLS connection. result mysqlResult // managed by clearResult() and handleOkPacket(). + compIO *compIO cfg *Config connector *connector maxAllowedPacket int maxWriteSize int - writeTimeout time.Duration flags clientFlag status statusFlag sequence uint8 + compressSequence uint8 parseTime bool + compress bool // for context support (Go 1.8+) watching bool @@ -62,6 +64,43 @@ func (mc *mysqlConn) log(v ...any) { mc.cfg.Logger.Print(v...) } +func (mc *mysqlConn) readWithTimeout(b []byte) (int, error) { + to := mc.cfg.ReadTimeout + if to > 0 { + if err := mc.netConn.SetReadDeadline(time.Now().Add(to)); err != nil { + return 0, err + } + } + return mc.netConn.Read(b) +} + +func (mc *mysqlConn) writeWithTimeout(b []byte) (int, error) { + to := mc.cfg.WriteTimeout + if to > 0 { + if err := mc.netConn.SetWriteDeadline(time.Now().Add(to)); err != nil { + return 0, err + } + } + return mc.netConn.Write(b) +} + +func (mc *mysqlConn) resetSequence() { + mc.sequence = 0 + mc.compressSequence = 0 +} + +// syncSequence must be called when finished writing some packet and before start reading. +func (mc *mysqlConn) syncSequence() { + // Syncs compressionSequence to sequence. + // This is not documented but done in `net_flush()` in MySQL and MariaDB. + // https://github.com/mariadb-corporation/mariadb-connector-c/blob/8228164f850b12353da24df1b93a1e53cc5e85e9/libmariadb/ma_net.c#L170-L171 + // https://github.com/mysql/mysql-server/blob/824e2b4064053f7daf17d7f3f84b7a3ed92e5fb4/sql-common/net_serv.cc#L293 + if mc.compress { + mc.sequence = mc.compressSequence + mc.compIO.reset() + } +} + // Handles parameters set in DSN after the connection is established func (mc *mysqlConn) handleParams() (err error) { var cmdSet strings.Builder @@ -147,7 +186,7 @@ func (mc *mysqlConn) cleanup() { return } if err := conn.Close(); err != nil { - mc.log(err) + mc.log("closing connection:", err) } // This function can be called from multiple goroutines. // So we can not mc.clearResult() here. diff --git a/connection_test.go b/connection_test.go index 6f8d2a6d..f7740898 100644 --- a/connection_test.go +++ b/connection_test.go @@ -19,7 +19,7 @@ import ( func TestInterpolateParams(t *testing.T) { mc := &mysqlConn{ - buf: newBuffer(nil), + buf: newBuffer(), maxAllowedPacket: maxPacketSize, cfg: &Config{ InterpolateParams: true, @@ -39,7 +39,7 @@ func TestInterpolateParams(t *testing.T) { func TestInterpolateParamsJSONRawMessage(t *testing.T) { mc := &mysqlConn{ - buf: newBuffer(nil), + buf: newBuffer(), maxAllowedPacket: maxPacketSize, cfg: &Config{ InterpolateParams: true, @@ -66,7 +66,7 @@ func TestInterpolateParamsJSONRawMessage(t *testing.T) { func TestInterpolateParamsTooManyPlaceholders(t *testing.T) { mc := &mysqlConn{ - buf: newBuffer(nil), + buf: newBuffer(), maxAllowedPacket: maxPacketSize, cfg: &Config{ InterpolateParams: true, @@ -83,7 +83,7 @@ func TestInterpolateParamsTooManyPlaceholders(t *testing.T) { // https://github.com/go-sql-driver/mysql/pull/490 func TestInterpolateParamsPlaceholderInString(t *testing.T) { mc := &mysqlConn{ - buf: newBuffer(nil), + buf: newBuffer(), maxAllowedPacket: maxPacketSize, cfg: &Config{ InterpolateParams: true, @@ -99,7 +99,7 @@ func TestInterpolateParamsPlaceholderInString(t *testing.T) { func TestInterpolateParamsUint64(t *testing.T) { mc := &mysqlConn{ - buf: newBuffer(nil), + buf: newBuffer(), maxAllowedPacket: maxPacketSize, cfg: &Config{ InterpolateParams: true, @@ -161,7 +161,7 @@ func TestPingMarkBadConnection(t *testing.T) { nc := badConnection{err: errors.New("boom")} mc := &mysqlConn{ netConn: nc, - buf: newBuffer(nc), + buf: newBuffer(), maxAllowedPacket: defaultMaxAllowedPacket, closech: make(chan struct{}), cfg: NewConfig(), @@ -178,7 +178,7 @@ func TestPingErrInvalidConn(t *testing.T) { nc := badConnection{err: errors.New("failed to write"), n: 10} mc := &mysqlConn{ netConn: nc, - buf: newBuffer(nc), + buf: newBuffer(), maxAllowedPacket: defaultMaxAllowedPacket, closech: make(chan struct{}), cfg: NewConfig(), diff --git a/connector.go b/connector.go index 769b3adc..a4f3655e 100644 --- a/connector.go +++ b/connector.go @@ -127,11 +127,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { } defer mc.finish() - mc.buf = newBuffer(mc.netConn) - - // Set I/O timeouts - mc.buf.timeout = mc.cfg.ReadTimeout - mc.writeTimeout = mc.cfg.WriteTimeout + mc.buf = newBuffer() // Reading Handshake Initialization Packet authData, plugin, err := mc.readHandshakePacket() @@ -170,6 +166,10 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { return nil, err } + if mc.cfg.compress && mc.flags&clientCompress == clientCompress { + mc.compress = true + mc.compIO = newCompIO(mc) + } if mc.cfg.MaxAllowedPacket > 0 { mc.maxAllowedPacket = mc.cfg.MaxAllowedPacket } else { diff --git a/const.go b/const.go index 0cee9b2e..4aadcd64 100644 --- a/const.go +++ b/const.go @@ -11,6 +11,8 @@ package mysql import "runtime" const ( + debug = false // for debugging. Set true only in development. + defaultAuthPlugin = "mysql_native_password" defaultMaxAllowedPacket = 64 << 20 // 64 MiB. See https://github.com/go-sql-driver/mysql/issues/1355 minProtocolVersion = 10 diff --git a/driver_test.go b/driver_test.go index 24d73c34..58b3cb38 100644 --- a/driver_test.go +++ b/driver_test.go @@ -147,12 +147,11 @@ func runTests(t *testing.T, dsn string, tests ...func(dbt *DBTest)) { db, err := sql.Open(driverNameTest, dsn) if err != nil { - t.Fatalf("error connecting: %s", err.Error()) + t.Fatalf("connecting %q: %s", dsn, err) } defer db.Close() - - cleanup := func() { - db.Exec("DROP TABLE IF EXISTS test") + if err = db.Ping(); err != nil { + t.Fatalf("connecting %q: %s", dsn, err) } dsn2 := dsn + "&interpolateParams=true" @@ -160,25 +159,46 @@ func runTests(t *testing.T, dsn string, tests ...func(dbt *DBTest)) { if _, err := ParseDSN(dsn2); err != errInvalidDSNUnsafeCollation { db2, err = sql.Open(driverNameTest, dsn2) if err != nil { - t.Fatalf("error connecting: %s", err.Error()) + t.Fatalf("connecting %q: %s", dsn2, err) } defer db2.Close() } + dsn3 := dsn + "&compress=true" + var db3 *sql.DB + db3, err = sql.Open(driverNameTest, dsn3) + if err != nil { + t.Fatalf("connecting %q: %s", dsn3, err) + } + defer db3.Close() + + cleanupSql := "DROP TABLE IF EXISTS test" + for _, test := range tests { test := test t.Run("default", func(t *testing.T) { dbt := &DBTest{t, db} - t.Cleanup(cleanup) + t.Cleanup(func() { + db.Exec(cleanupSql) + }) test(dbt) }) if db2 != nil { t.Run("interpolateParams", func(t *testing.T) { dbt2 := &DBTest{t, db2} - t.Cleanup(cleanup) + t.Cleanup(func() { + db2.Exec(cleanupSql) + }) test(dbt2) }) } + t.Run("compress", func(t *testing.T) { + dbt3 := &DBTest{t, db3} + t.Cleanup(func() { + db3.Exec(cleanupSql) + }) + test(dbt3) + }) } } @@ -958,12 +978,16 @@ func TestDateTime(t *testing.T) { var err error rows, err = dbt.db.Query(`SELECT cast("00:00:00.1" as TIME(1)) = "00:00:00.1"`) if err == nil { - rows.Scan(µsecsSupported) + if rows.Next() { + rows.Scan(µsecsSupported) + } rows.Close() } rows, err = dbt.db.Query(`SELECT cast("0000-00-00" as DATE) = "0000-00-00"`) if err == nil { - rows.Scan(&zeroDateSupported) + if rows.Next() { + rows.Scan(&zeroDateSupported) + } rows.Close() } for _, setups := range testcases { @@ -1265,8 +1289,7 @@ func TestLongData(t *testing.T) { var rows *sql.Rows // Long text data - const nonDataQueryLen = 28 // length query w/o value - inS := in[:maxAllowedPacketSize-nonDataQueryLen] + inS := in[:maxAllowedPacketSize-100] dbt.mustExec("INSERT INTO test VALUES('" + inS + "')") rows = dbt.mustQuery("SELECT value FROM test") defer rows.Close() diff --git a/dsn.go b/dsn.go index f391a8fc..9b560b73 100644 --- a/dsn.go +++ b/dsn.go @@ -73,7 +73,10 @@ type Config struct { ParseTime bool // Parse time values to time.Time RejectReadOnly bool // Reject read-only connections - // unexported fields. new options should be come here + // unexported fields. new options should be come here. + // boolean first. alphabetical order. + + compress bool // Enable zlib compression beforeConnect func(context.Context, *Config) error // Invoked before a connection is established pubKey *rsa.PublicKey // Server public key @@ -93,7 +96,6 @@ func NewConfig() *Config { AllowNativePasswords: true, CheckConnLiveness: true, } - return cfg } @@ -125,6 +127,14 @@ func BeforeConnect(fn func(context.Context, *Config) error) Option { } } +// EnableCompress sets the compression mode. +func EnableCompression(yes bool) Option { + return func(cfg *Config) error { + cfg.compress = yes + return nil + } +} + func (cfg *Config) Clone() *Config { cp := *cfg if cp.TLS != nil { @@ -297,6 +307,10 @@ func (cfg *Config) FormatDSN() string { writeDSNParam(&buf, &hasParam, "columnsWithAlias", "true") } + if cfg.compress { + writeDSNParam(&buf, &hasParam, "compress", "true") + } + if cfg.InterpolateParams { writeDSNParam(&buf, &hasParam, "interpolateParams", "true") } @@ -525,7 +539,11 @@ func parseDSNParams(cfg *Config, params string) (err error) { // Compression case "compress": - return errors.New("compression not implemented yet") + var isBool bool + cfg.compress, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } // Enable client side placeholder substitution case "interpolateParams": diff --git a/infile.go b/infile.go index cf892bea..555ef71a 100644 --- a/infile.go +++ b/infile.go @@ -172,6 +172,7 @@ func (mc *okHandler) handleInFileRequest(name string) (err error) { if ioErr := mc.conn().writePacket(data[:4]); ioErr != nil { return ioErr } + mc.conn().syncSequence() // read OK packet if err == nil { diff --git a/packets.go b/packets.go index 736e4418..e4d2820e 100644 --- a/packets.go +++ b/packets.go @@ -28,9 +28,16 @@ import ( // Read packet to buffer 'data' func (mc *mysqlConn) readPacket() ([]byte, error) { var prevData []byte + invalidSequence := false + + readNext := mc.buf.readNext + if mc.compress { + readNext = mc.compIO.readNext + } + for { // read packet header - data, err := mc.buf.readNext(4) + data, err := readNext(4, mc.readWithTimeout) if err != nil { mc.close() if cerr := mc.canceled.Value(); cerr != nil { @@ -41,17 +48,29 @@ func (mc *mysqlConn) readPacket() ([]byte, error) { } // packet length [24 bit] - pktLen := int(uint32(data[0]) | uint32(data[1])<<8 | uint32(data[2])<<16) - - // check packet sync [8 bit] - if data[3] != mc.sequence { - mc.close() - if data[3] > mc.sequence { - return nil, ErrPktSyncMul + pktLen := getUint24(data[:3]) + seq := data[3] + + if mc.compress { + // MySQL and MariaDB doesn't check packet nr in compressed packet. + if debug && seq != mc.compressSequence { + fmt.Printf("[debug] mismatched compression sequence nr: expected: %v, got %v", + mc.compressSequence, seq) + } + mc.compressSequence = seq + 1 + } else { + // check packet sync [8 bit] + if seq != mc.sequence { + mc.log(fmt.Sprintf("[warn] unexpected seq nr: expected %v, got %v", mc.sequence, seq)) + // For large packets, we stop reading as soon as sync error. + if len(prevData) > 0 { + mc.close() + return nil, ErrPktSyncMul + } + invalidSequence = true } - return nil, ErrPktSync + mc.sequence++ } - mc.sequence++ // packets with length 0 terminate a previous packet which is a // multiple of (2^24)-1 bytes long @@ -62,12 +81,11 @@ func (mc *mysqlConn) readPacket() ([]byte, error) { mc.close() return nil, ErrInvalidConn } - return prevData, nil } // read packet body [pktLen bytes] - data, err = mc.buf.readNext(pktLen) + data, err = readNext(pktLen, mc.readWithTimeout) if err != nil { mc.close() if cerr := mc.canceled.Value(); cerr != nil { @@ -80,11 +98,18 @@ func (mc *mysqlConn) readPacket() ([]byte, error) { // return data if this was the last packet if pktLen < maxPacketSize { // zero allocations for non-split packets - if prevData == nil { - return data, nil + if prevData != nil { + data = append(prevData, data...) } - - return append(prevData, data...), nil + if invalidSequence { + mc.close() + // return sync error only for regular packet. + // error packets may have wrong sequence number. + if data[0] != iERR { + return nil, ErrPktSync + } + } + return data, nil } prevData = append(prevData, data...) @@ -94,36 +119,26 @@ func (mc *mysqlConn) readPacket() ([]byte, error) { // Write packet buffer 'data' func (mc *mysqlConn) writePacket(data []byte) error { pktLen := len(data) - 4 - if pktLen > mc.maxAllowedPacket { return ErrPktTooLarge } + writeFunc := mc.writeWithTimeout + if mc.compress { + writeFunc = mc.compIO.writePackets + } + for { - var size int - if pktLen >= maxPacketSize { - data[0] = 0xff - data[1] = 0xff - data[2] = 0xff - size = maxPacketSize - } else { - data[0] = byte(pktLen) - data[1] = byte(pktLen >> 8) - data[2] = byte(pktLen >> 16) - size = pktLen - } + size := min(maxPacketSize, pktLen) + putUint24(data[:3], size) data[3] = mc.sequence // Write packet - if mc.writeTimeout > 0 { - if err := mc.netConn.SetWriteDeadline(time.Now().Add(mc.writeTimeout)); err != nil { - mc.cleanup() - mc.log(err) - return err - } + if debug { + fmt.Printf("writePacket: size=%v seq=%v", size, mc.sequence) } - n, err := mc.netConn.Write(data[:4+size]) + n, err := writeFunc(data[:4+size]) if err != nil { mc.cleanup() if cerr := mc.canceled.Value(); cerr != nil { @@ -267,7 +282,9 @@ func (mc *mysqlConn) writeHandshakeResponsePacket(authResp []byte, plugin string if mc.cfg.ClientFoundRows { clientFlags |= clientFoundRows } - + if mc.cfg.compress && mc.flags&clientCompress == clientCompress { + clientFlags |= clientCompress + } // To enable TLS / SSL if mc.cfg.TLS != nil { clientFlags |= clientSSL @@ -358,7 +375,6 @@ func (mc *mysqlConn) writeHandshakeResponsePacket(authResp []byte, plugin string return err } mc.netConn = tlsConn - mc.buf.nc = tlsConn } // User [null terminated string] @@ -413,7 +429,7 @@ func (mc *mysqlConn) writeAuthSwitchPacket(authData []byte) error { func (mc *mysqlConn) writeCommandPacket(command byte) error { // Reset Packet Sequence - mc.sequence = 0 + mc.resetSequence() data, err := mc.buf.takeSmallBuffer(4 + 1) if err != nil { @@ -429,7 +445,7 @@ func (mc *mysqlConn) writeCommandPacket(command byte) error { func (mc *mysqlConn) writeCommandPacketStr(command byte, arg string) error { // Reset Packet Sequence - mc.sequence = 0 + mc.resetSequence() pktLen := 1 + len(arg) data, err := mc.buf.takeBuffer(pktLen + 4) @@ -444,12 +460,14 @@ func (mc *mysqlConn) writeCommandPacketStr(command byte, arg string) error { copy(data[5:], arg) // Send CMD packet - return mc.writePacket(data) + err = mc.writePacket(data) + mc.syncSequence() + return err } func (mc *mysqlConn) writeCommandPacketUint32(command byte, arg uint32) error { // Reset Packet Sequence - mc.sequence = 0 + mc.resetSequence() data, err := mc.buf.takeSmallBuffer(4 + 1 + 4) if err != nil { @@ -932,7 +950,7 @@ func (stmt *mysqlStmt) writeCommandLongData(paramID int, arg []byte) error { pktLen = dataOffset + argLen } - stmt.mc.sequence = 0 + stmt.mc.resetSequence() // Add command byte [1 byte] data[4] = comStmtSendLongData @@ -953,11 +971,10 @@ func (stmt *mysqlStmt) writeCommandLongData(paramID int, arg []byte) error { continue } return err - } // Reset Packet Sequence - stmt.mc.sequence = 0 + stmt.mc.resetSequence() return nil } @@ -982,7 +999,7 @@ func (stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error { } // Reset packet-sequence - mc.sequence = 0 + mc.resetSequence() var data []byte var err error @@ -1198,7 +1215,9 @@ func (stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error { data = data[:pos] } - return mc.writePacket(data) + err = mc.writePacket(data) + mc.syncSequence() + return err } // For each remaining resultset in the stream, discards its rows and updates diff --git a/packets_test.go b/packets_test.go index fa4683ea..694b0564 100644 --- a/packets_test.go +++ b/packets_test.go @@ -98,7 +98,7 @@ func newRWMockConn(sequence uint8) (*mockConn, *mysqlConn) { conn := new(mockConn) connector := newConnector(NewConfig()) mc := &mysqlConn{ - buf: newBuffer(conn), + buf: newBuffer(), cfg: connector.cfg, connector: connector, netConn: conn, @@ -112,7 +112,9 @@ func newRWMockConn(sequence uint8) (*mockConn, *mysqlConn) { func TestReadPacketSingleByte(t *testing.T) { conn := new(mockConn) mc := &mysqlConn{ - buf: newBuffer(conn), + netConn: conn, + buf: newBuffer(), + cfg: NewConfig(), } conn.data = []byte{0x01, 0x00, 0x00, 0x00, 0xff} @@ -143,12 +145,12 @@ func TestReadPacketWrongSequenceID(t *testing.T) { { ClientSequenceID: 0, ServerSequenceID: 0x42, - ExpectedErr: ErrPktSyncMul, + ExpectedErr: ErrPktSync, }, } { conn, mc := newRWMockConn(testCase.ClientSequenceID) - conn.data = []byte{0x01, 0x00, 0x00, testCase.ServerSequenceID, 0xff} + conn.data = []byte{0x01, 0x00, 0x00, testCase.ServerSequenceID, 0x22} _, err := mc.readPacket() if err != testCase.ExpectedErr { t.Errorf("expected %v, got %v", testCase.ExpectedErr, err) @@ -164,7 +166,9 @@ func TestReadPacketWrongSequenceID(t *testing.T) { func TestReadPacketSplit(t *testing.T) { conn := new(mockConn) mc := &mysqlConn{ - buf: newBuffer(conn), + netConn: conn, + buf: newBuffer(), + cfg: NewConfig(), } data := make([]byte, maxPacketSize*2+4*3) @@ -269,7 +273,8 @@ func TestReadPacketSplit(t *testing.T) { func TestReadPacketFail(t *testing.T) { conn := new(mockConn) mc := &mysqlConn{ - buf: newBuffer(conn), + netConn: conn, + buf: newBuffer(), closech: make(chan struct{}), cfg: NewConfig(), } @@ -285,7 +290,7 @@ func TestReadPacketFail(t *testing.T) { // reset conn.reads = 0 mc.sequence = 0 - mc.buf = newBuffer(conn) + mc.buf = newBuffer() // fail to read header conn.closed = true @@ -298,7 +303,7 @@ func TestReadPacketFail(t *testing.T) { conn.closed = false conn.reads = 0 mc.sequence = 0 - mc.buf = newBuffer(conn) + mc.buf = newBuffer() // fail to read body conn.maxReads = 1 @@ -313,7 +318,8 @@ func TestReadPacketFail(t *testing.T) { func TestRegression801(t *testing.T) { conn := new(mockConn) mc := &mysqlConn{ - buf: newBuffer(conn), + netConn: conn, + buf: newBuffer(), cfg: new(Config), sequence: 42, closech: make(chan struct{}), diff --git a/utils.go b/utils.go index cda24fe7..d902f3b6 100644 --- a/utils.go +++ b/utils.go @@ -490,6 +490,18 @@ func formatBinaryTime(src []byte, length uint8) (driver.Value, error) { * Convert from and to bytes * ******************************************************************************/ +// 24bit integer: used for packet headers. + +func putUint24(data []byte, n int) { + data[2] = byte(n >> 16) + data[1] = byte(n >> 8) + data[0] = byte(n) +} + +func getUint24(data []byte) int { + return int(data[2])<<16 | int(data[1])<<8 | int(data[0]) +} + func uint64ToBytes(n uint64) []byte { return []byte{ byte(n), From b335ed33d6a10803949fb71bbd7e0974c5be38b2 Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Fri, 20 Dec 2024 14:13:32 +0900 Subject: [PATCH 10/20] use binary.LittleEndian (#1651) Recent Go does inlinine functions well. Using `LittleEndian.Put*` would better for readability and minimize bound check. Additionally, Go 1.19 introduced `LittleEndian.Append*`. It reduce more code. --- packets.go | 72 ++++++++---------------------------------------------- utils.go | 29 ++++++---------------- 2 files changed, 17 insertions(+), 84 deletions(-) diff --git a/packets.go b/packets.go index e4d2820e..f3860c5f 100644 --- a/packets.go +++ b/packets.go @@ -329,16 +329,10 @@ func (mc *mysqlConn) writeHandshakeResponsePacket(authResp []byte, plugin string } // ClientFlags [32 bit] - data[4] = byte(clientFlags) - data[5] = byte(clientFlags >> 8) - data[6] = byte(clientFlags >> 16) - data[7] = byte(clientFlags >> 24) + binary.LittleEndian.PutUint32(data[4:], uint32(clientFlags)) // MaxPacketSize [32 bit] (none) - data[8] = 0x00 - data[9] = 0x00 - data[10] = 0x00 - data[11] = 0x00 + binary.LittleEndian.PutUint32(data[8:], 0) // Collation ID [1 byte] data[12] = defaultCollationID @@ -478,10 +472,7 @@ func (mc *mysqlConn) writeCommandPacketUint32(command byte, arg uint32) error { data[4] = command // Add arg [32 bit] - data[5] = byte(arg) - data[6] = byte(arg >> 8) - data[7] = byte(arg >> 16) - data[8] = byte(arg >> 24) + binary.LittleEndian.PutUint32(data[5:], arg) // Send CMD packet return mc.writePacket(data) @@ -955,14 +946,10 @@ func (stmt *mysqlStmt) writeCommandLongData(paramID int, arg []byte) error { data[4] = comStmtSendLongData // Add stmtID [32 bit] - data[5] = byte(stmt.id) - data[6] = byte(stmt.id >> 8) - data[7] = byte(stmt.id >> 16) - data[8] = byte(stmt.id >> 24) + binary.LittleEndian.PutUint32(data[5:], stmt.id) // Add paramID [16 bit] - data[9] = byte(paramID) - data[10] = byte(paramID >> 8) + binary.LittleEndian.PutUint16(data[9:], uint16(paramID)) // Send CMD packet err := stmt.mc.writePacket(data[:4+pktLen]) @@ -1018,19 +1005,13 @@ func (stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error { data[4] = comStmtExecute // statement_id [4 bytes] - data[5] = byte(stmt.id) - data[6] = byte(stmt.id >> 8) - data[7] = byte(stmt.id >> 16) - data[8] = byte(stmt.id >> 24) + binary.LittleEndian.PutUint32(data[5:], stmt.id) // flags (0: CURSOR_TYPE_NO_CURSOR) [1 byte] data[9] = 0x00 // iteration_count (uint32(1)) [4 bytes] - data[10] = 0x01 - data[11] = 0x00 - data[12] = 0x00 - data[13] = 0x00 + binary.LittleEndian.PutUint32(data[10:], 1) if len(args) > 0 { pos := minPktLen @@ -1084,50 +1065,17 @@ func (stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error { case int64: paramTypes[i+i] = byte(fieldTypeLongLong) paramTypes[i+i+1] = 0x00 - - if cap(paramValues)-len(paramValues)-8 >= 0 { - paramValues = paramValues[:len(paramValues)+8] - binary.LittleEndian.PutUint64( - paramValues[len(paramValues)-8:], - uint64(v), - ) - } else { - paramValues = append(paramValues, - uint64ToBytes(uint64(v))..., - ) - } + paramValues = binary.LittleEndian.AppendUint64(paramValues, uint64(v)) case uint64: paramTypes[i+i] = byte(fieldTypeLongLong) paramTypes[i+i+1] = 0x80 // type is unsigned - - if cap(paramValues)-len(paramValues)-8 >= 0 { - paramValues = paramValues[:len(paramValues)+8] - binary.LittleEndian.PutUint64( - paramValues[len(paramValues)-8:], - uint64(v), - ) - } else { - paramValues = append(paramValues, - uint64ToBytes(uint64(v))..., - ) - } + paramValues = binary.LittleEndian.AppendUint64(paramValues, uint64(v)) case float64: paramTypes[i+i] = byte(fieldTypeDouble) paramTypes[i+i+1] = 0x00 - - if cap(paramValues)-len(paramValues)-8 >= 0 { - paramValues = paramValues[:len(paramValues)+8] - binary.LittleEndian.PutUint64( - paramValues[len(paramValues)-8:], - math.Float64bits(v), - ) - } else { - paramValues = append(paramValues, - uint64ToBytes(math.Float64bits(v))..., - ) - } + paramValues = binary.LittleEndian.AppendUint64(paramValues, math.Float64bits(v)) case bool: paramTypes[i+i] = byte(fieldTypeTiny) diff --git a/utils.go b/utils.go index d902f3b6..44f43ef7 100644 --- a/utils.go +++ b/utils.go @@ -502,19 +502,6 @@ func getUint24(data []byte) int { return int(data[2])<<16 | int(data[1])<<8 | int(data[0]) } -func uint64ToBytes(n uint64) []byte { - return []byte{ - byte(n), - byte(n >> 8), - byte(n >> 16), - byte(n >> 24), - byte(n >> 32), - byte(n >> 40), - byte(n >> 48), - byte(n >> 56), - } -} - func uint64ToString(n uint64) []byte { var a [20]byte i := 20 @@ -598,18 +585,15 @@ func readLengthEncodedInteger(b []byte) (uint64, bool, int) { // 252: value of following 2 case 0xfc: - return uint64(b[1]) | uint64(b[2])<<8, false, 3 + return uint64(binary.LittleEndian.Uint16(b[1:])), false, 3 // 253: value of following 3 case 0xfd: - return uint64(b[1]) | uint64(b[2])<<8 | uint64(b[3])<<16, false, 4 + return uint64(getUint24(b[1:])), false, 4 // 254: value of following 8 case 0xfe: - return uint64(b[1]) | uint64(b[2])<<8 | uint64(b[3])<<16 | - uint64(b[4])<<24 | uint64(b[5])<<32 | uint64(b[6])<<40 | - uint64(b[7])<<48 | uint64(b[8])<<56, - false, 9 + return uint64(binary.LittleEndian.Uint64(b[1:])), false, 9 } // 0-250: value of first byte @@ -623,13 +607,14 @@ func appendLengthEncodedInteger(b []byte, n uint64) []byte { return append(b, byte(n)) case n <= 0xffff: - return append(b, 0xfc, byte(n), byte(n>>8)) + b = append(b, 0xfc) + return binary.LittleEndian.AppendUint16(b, uint16(n)) case n <= 0xffffff: return append(b, 0xfd, byte(n), byte(n>>8), byte(n>>16)) } - return append(b, 0xfe, byte(n), byte(n>>8), byte(n>>16), byte(n>>24), - byte(n>>32), byte(n>>40), byte(n>>48), byte(n>>56)) + b = append(b, 0xfe) + return binary.LittleEndian.AppendUint64(b, n) } func appendLengthEncodedString(b []byte, s string) []byte { From 7403860363ca112af503b4612568c3096fecb466 Mon Sep 17 00:00:00 2001 From: Artur Melanchyk Date: Tue, 24 Dec 2024 05:10:11 +0100 Subject: [PATCH 11/20] Make fileRegister a set (#1653) --- AUTHORS | 1 + infile.go | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/AUTHORS b/AUTHORS index cbcc90f5..a3839579 100644 --- a/AUTHORS +++ b/AUTHORS @@ -20,6 +20,7 @@ Andrew Reid Animesh Ray Arne Hormann Ariel Mashraki +Artur Melanchyk Asta Xie B Lamarche Brian Hendriks diff --git a/infile.go b/infile.go index 555ef71a..453ae091 100644 --- a/infile.go +++ b/infile.go @@ -17,7 +17,7 @@ import ( ) var ( - fileRegister map[string]bool + fileRegister map[string]struct{} fileRegisterLock sync.RWMutex readerRegister map[string]func() io.Reader readerRegisterLock sync.RWMutex @@ -37,10 +37,10 @@ func RegisterLocalFile(filePath string) { fileRegisterLock.Lock() // lazy map init if fileRegister == nil { - fileRegister = make(map[string]bool) + fileRegister = make(map[string]struct{}) } - fileRegister[strings.Trim(filePath, `"`)] = true + fileRegister[strings.Trim(filePath, `"`)] = struct{}{} fileRegisterLock.Unlock() } @@ -123,9 +123,9 @@ func (mc *okHandler) handleInFileRequest(name string) (err error) { } else { // File name = strings.Trim(name, `"`) fileRegisterLock.RLock() - fr := fileRegister[name] + _, exists := fileRegister[name] fileRegisterLock.RUnlock() - if mc.cfg.AllowAllFiles || fr { + if mc.cfg.AllowAllFiles || exists { var file *os.File var fi os.FileInfo From 255d1ad98f1d3be99661d2a8c0a7a91418acbc8d Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Wed, 22 Jan 2025 14:59:24 +0900 Subject: [PATCH 12/20] better max_allowed_packet parsing (#1661) Remove `stringToInt()` and use `strconv.Atoi` instead. --- connector.go | 8 +++++++- utils.go | 10 ---------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/connector.go b/connector.go index a4f3655e..bc1d46af 100644 --- a/connector.go +++ b/connector.go @@ -11,6 +11,7 @@ package mysql import ( "context" "database/sql/driver" + "fmt" "net" "os" "strconv" @@ -179,7 +180,12 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { mc.Close() return nil, err } - mc.maxAllowedPacket = stringToInt(maxap) - 1 + n, err := strconv.Atoi(string(maxap)) + if err != nil { + mc.Close() + return nil, fmt.Errorf("invalid max_allowed_packet value (%q): %w", maxap, err) + } + mc.maxAllowedPacket = n - 1 } if mc.maxAllowedPacket < maxPacketSize { mc.maxWriteSize = mc.maxAllowedPacket diff --git a/utils.go b/utils.go index 44f43ef7..8716c26c 100644 --- a/utils.go +++ b/utils.go @@ -524,16 +524,6 @@ func uint64ToString(n uint64) []byte { return a[i:] } -// treats string value as unsigned integer representation -func stringToInt(b []byte) int { - val := 0 - for i := range b { - val *= 10 - val += int(b[i] - 0x30) - } - return val -} - // returns the string read as a bytes slice, whether the value is NULL, // the number of bytes read and an error, in case the string is longer than // the input slice From 85c6311943c82f1300077b2d0e94687106ab61e7 Mon Sep 17 00:00:00 2001 From: Kolbe Kegel Date: Thu, 23 Jan 2025 01:32:31 -0800 Subject: [PATCH 13/20] Add error 1290/ER_READ_ONLY_MODE to rejectReadOnly handling (#1660) --- packets.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packets.go b/packets.go index f3860c5f..9951bdf8 100644 --- a/packets.go +++ b/packets.go @@ -574,7 +574,8 @@ func (mc *mysqlConn) handleErrorPacket(data []byte) error { // 1792: ER_CANT_EXECUTE_IN_READ_ONLY_TRANSACTION // 1290: ER_OPTION_PREVENTS_STATEMENT (returned by Aurora during failover) - if (errno == 1792 || errno == 1290) && mc.cfg.RejectReadOnly { + // 1836: ER_READ_ONLY_MODE + if (errno == 1792 || errno == 1290 || errno == 1836) && mc.cfg.RejectReadOnly { // Oops; we are connected to a read-only connection, and won't be able // to issue any write statements. Since RejectReadOnly is configured, // we throw away this connection hoping this one would have write From 341a5a5246835b2ac4b8d36bb12a9dfad70663f4 Mon Sep 17 00:00:00 2001 From: Bes Dollma <143414965+bdollma-te@users.noreply.github.com> Date: Wed, 29 Jan 2025 07:59:01 +0200 Subject: [PATCH 14/20] Fix auth_switch_request packet handling auth_data contains last NUL. Fix #1666 Signed-off-by: Bes Dollma (bdollma) --- AUTHORS | 2 ++ auth_test.go | 24 ++++++++++++------------ packets.go | 3 +++ 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/AUTHORS b/AUTHORS index a3839579..123b5dc5 100644 --- a/AUTHORS +++ b/AUTHORS @@ -23,6 +23,7 @@ Ariel Mashraki Artur Melanchyk Asta Xie B Lamarche +Bes Dollma Brian Hendriks Bulat Gaifullin Caine Jette @@ -146,4 +147,5 @@ PingCAP Inc. Pivotal Inc. Shattered Silicon Ltd. Stripe Inc. +ThousandEyes Zendesk Inc. diff --git a/auth_test.go b/auth_test.go index 8caed1ff..46e1e3b4 100644 --- a/auth_test.go +++ b/auth_test.go @@ -734,9 +734,9 @@ func TestAuthSwitchCachingSHA256PasswordCached(t *testing.T) { expectedReply := []byte{ // 1. Packet: Hash - 32, 0, 0, 3, 129, 93, 132, 95, 114, 48, 79, 215, 128, 62, 193, 118, 128, - 54, 75, 208, 159, 252, 227, 215, 129, 15, 242, 97, 19, 159, 31, 20, 58, - 153, 9, 130, + 32, 0, 0, 3, 219, 72, 64, 97, 56, 197, 167, 203, 64, 236, 168, 80, 223, + 56, 103, 217, 196, 176, 124, 60, 253, 41, 195, 10, 205, 190, 177, 206, 63, + 118, 211, 69, } if !bytes.Equal(conn.written, expectedReply) { t.Errorf("got unexpected data: %v", conn.written) @@ -803,9 +803,9 @@ func TestAuthSwitchCachingSHA256PasswordFullRSA(t *testing.T) { expectedReplyPrefix := []byte{ // 1. Packet: Hash - 32, 0, 0, 3, 129, 93, 132, 95, 114, 48, 79, 215, 128, 62, 193, 118, 128, - 54, 75, 208, 159, 252, 227, 215, 129, 15, 242, 97, 19, 159, 31, 20, 58, - 153, 9, 130, + 32, 0, 0, 3, 219, 72, 64, 97, 56, 197, 167, 203, 64, 236, 168, 80, 223, + 56, 103, 217, 196, 176, 124, 60, 253, 41, 195, 10, 205, 190, 177, 206, 63, + 118, 211, 69, // 2. Packet: Pub Key Request 1, 0, 0, 5, 2, @@ -848,9 +848,9 @@ func TestAuthSwitchCachingSHA256PasswordFullRSAWithKey(t *testing.T) { expectedReplyPrefix := []byte{ // 1. Packet: Hash - 32, 0, 0, 3, 129, 93, 132, 95, 114, 48, 79, 215, 128, 62, 193, 118, 128, - 54, 75, 208, 159, 252, 227, 215, 129, 15, 242, 97, 19, 159, 31, 20, 58, - 153, 9, 130, + 32, 0, 0, 3, 219, 72, 64, 97, 56, 197, 167, 203, 64, 236, 168, 80, 223, + 56, 103, 217, 196, 176, 124, 60, 253, 41, 195, 10, 205, 190, 177, 206, 63, + 118, 211, 69, // 2. Packet: Encrypted Password 0, 1, 0, 5, // [changing bytes] @@ -891,9 +891,9 @@ func TestAuthSwitchCachingSHA256PasswordFullSecure(t *testing.T) { expectedReply := []byte{ // 1. Packet: Hash - 32, 0, 0, 3, 129, 93, 132, 95, 114, 48, 79, 215, 128, 62, 193, 118, 128, - 54, 75, 208, 159, 252, 227, 215, 129, 15, 242, 97, 19, 159, 31, 20, 58, - 153, 9, 130, + 32, 0, 0, 3, 219, 72, 64, 97, 56, 197, 167, 203, 64, 236, 168, 80, 223, + 56, 103, 217, 196, 176, 124, 60, 253, 41, 195, 10, 205, 190, 177, 206, 63, + 118, 211, 69, // 2. Packet: Cleartext password 7, 0, 0, 5, 115, 101, 99, 114, 101, 116, 0, diff --git a/packets.go b/packets.go index 9951bdf8..4b836216 100644 --- a/packets.go +++ b/packets.go @@ -510,6 +510,9 @@ func (mc *mysqlConn) readAuthResult() ([]byte, string, error) { } plugin := string(data[1:pluginEndIndex]) authData := data[pluginEndIndex+1:] + if len(authData) > 0 && authData[len(authData)-1] == 0 { + authData = authData[:len(authData)-1] + } return authData, plugin, nil default: // Error otherwise From 5d1bb8a9cf03422554dd52abf5eba89b8ca11307 Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Tue, 18 Feb 2025 12:05:50 +0900 Subject: [PATCH 15/20] fix flaky test. (#1663) TestIssue1567 fails by max_connections error. This makes our CI unhappy. https://github.com/go-sql-driver/mysql/actions/runs/12904961433/job/35984402310 --- driver_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/driver_test.go b/driver_test.go index 58b3cb38..00e82865 100644 --- a/driver_test.go +++ b/driver_test.go @@ -3608,6 +3608,12 @@ func runCallCommand(dbt *DBTest, query, name string) { func TestIssue1567(t *testing.T) { // enable TLS. runTests(t, dsn+"&tls=skip-verify", func(dbt *DBTest) { + var max int + err := dbt.db.QueryRow("SELECT @@max_connections").Scan(&max) + if err != nil { + dbt.Fatalf("%s", err.Error()) + } + // disable connection pooling. // data race happens when new connection is created. dbt.db.SetMaxIdleConns(0) @@ -3627,6 +3633,9 @@ func TestIssue1567(t *testing.T) { if testing.Short() { count = 10 } + if count > max { + count = max + } for i := 0; i < count; i++ { timeout := time.Duration(mrand.Int63n(int64(rtt))) From 58941dd8a7888cf3d593d7bb182120e42168eac9 Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Tue, 18 Feb 2025 12:37:58 +0900 Subject: [PATCH 16/20] release v1.9.0 (#1662) --- CHANGELOG.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c9bd9b1..d8c3aac1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,28 @@ +# Changelog + +## v1.9.0 (2025-02-18) + +### Major Changes + +- Implement zlib compression. (#1487) +- Supported Go version is updated to Go 1.21+. (#1639) +- Add support for VECTOR type introduced in MySQL 9.0. (#1609) +- Config object can have custom dial function. (#1527) + +### Bugfixes + +- Fix auth errors when username/password are too long. (#1625) +- Check if MySQL supports CLIENT_CONNECT_ATTRS before sending client attributes. (#1640) +- Fix auth switch request handling. (#1666) + +### Other changes + +- Add "filename:line" prefix to log in go-mysql. Custom loggers now show it. (#1589) +- Improve error handling. It reduces the "busy buffer" errors. (#1595, #1601, #1641) +- Use `strconv.Atoi` to parse max_allowed_packet. (#1661) +- `rejectReadOnly` option now handles ER_READ_ONLY_MODE (1290) error too. (#1660) + + ## Version 1.8.1 (2024-03-26) Bugfixes: From c87981610c07572d94be59d39550be1e3b1b5bb3 Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Mon, 10 Mar 2025 11:33:49 +0900 Subject: [PATCH 17/20] add Charset() option (#1679) Fix #1664. --- dsn.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/dsn.go b/dsn.go index 9b560b73..9bafab44 100644 --- a/dsn.go +++ b/dsn.go @@ -44,7 +44,6 @@ type Config struct { DBName string // Database name Params map[string]string // Connection parameters ConnectionAttributes string // Connection Attributes, comma-delimited string of user-defined "key:value" pairs - charsets []string // Connection charset. When set, this will be set in SET NAMES query Collation string // Connection collation. When set, this will be set in SET NAMES COLLATE query Loc *time.Location // Location for time.Time values MaxAllowedPacket int // Max packet size allowed @@ -81,6 +80,7 @@ type Config struct { beforeConnect func(context.Context, *Config) error // Invoked before a connection is established pubKey *rsa.PublicKey // Server public key timeTruncate time.Duration // Truncate time.Time values to the specified duration + charsets []string // Connection charset. When set, this will be set in SET NAMES query } // Functional Options Pattern @@ -135,6 +135,21 @@ func EnableCompression(yes bool) Option { } } +// Charset sets the connection charset and collation. +// +// charset is the connection charset. +// collation is the connection collation. It can be null or empty string. +// +// When collation is not specified, `SET NAMES ` command is sent when the connection is established. +// When collation is specified, `SET NAMES COLLATE ` command is sent when the connection is established. +func Charset(charset, collation string) Option { + return func(cfg *Config) error { + cfg.charsets = []string{charset} + cfg.Collation = collation + return nil + } +} + func (cfg *Config) Clone() *Config { cp := *cfg if cp.TLS != nil { From 88ff88b5915d34bde2b2c59991c586abb8ea9eca Mon Sep 17 00:00:00 2001 From: Bogdan Constantinescu Date: Mon, 10 Mar 2025 04:48:22 +0200 Subject: [PATCH 18/20] Fix FormatDSN missing ConnectionAttributes (#1619) Fix #1618 --- AUTHORS | 1 + dsn.go | 4 ++++ dsn_test.go | 6 +++++- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/AUTHORS b/AUTHORS index 123b5dc5..510b869b 100644 --- a/AUTHORS +++ b/AUTHORS @@ -24,6 +24,7 @@ Artur Melanchyk Asta Xie B Lamarche Bes Dollma +Bogdan Constantinescu Brian Hendriks Bulat Gaifullin Caine Jette diff --git a/dsn.go b/dsn.go index 9bafab44..ecf62567 100644 --- a/dsn.go +++ b/dsn.go @@ -322,6 +322,10 @@ func (cfg *Config) FormatDSN() string { writeDSNParam(&buf, &hasParam, "columnsWithAlias", "true") } + if cfg.ConnectionAttributes != "" { + writeDSNParam(&buf, &hasParam, "connectionAttributes", url.QueryEscape(cfg.ConnectionAttributes)) + } + if cfg.compress { writeDSNParam(&buf, &hasParam, "compress", "true") } diff --git a/dsn_test.go b/dsn_test.go index 863d1482..436f7799 100644 --- a/dsn_test.go +++ b/dsn_test.go @@ -77,6 +77,9 @@ var testDSNs = []struct { }, { "user:password@/dbname?loc=UTC&timeout=30s&parseTime=true&timeTruncate=1h", &Config{User: "user", Passwd: "password", Net: "tcp", Addr: "127.0.0.1:3306", DBName: "dbname", Loc: time.UTC, Timeout: 30 * time.Second, ParseTime: true, MaxAllowedPacket: defaultMaxAllowedPacket, Logger: defaultLogger, AllowNativePasswords: true, CheckConnLiveness: true, timeTruncate: time.Hour}, +}, { + "foo:bar@tcp(192.168.1.50:3307)/baz?timeout=10s&connectionAttributes=program_name:MySQLGoDriver%2FTest,program_version:1.2.3", + &Config{User: "foo", Passwd: "bar", Net: "tcp", Addr: "192.168.1.50:3307", DBName: "baz", Loc: time.UTC, Timeout: 10 * time.Second, MaxAllowedPacket: defaultMaxAllowedPacket, Logger: defaultLogger, AllowNativePasswords: true, CheckConnLiveness: true, ConnectionAttributes: "program_name:MySQLGoDriver/Test,program_version:1.2.3"}, }, } @@ -109,7 +112,8 @@ func TestDSNParserInvalid(t *testing.T) { "User:pass@tcp(1.2.3.4:3306)", // no trailing slash "net()/", // unknown default addr "user:pass@tcp(127.0.0.1:3306)/db/name", // invalid dbname - "user:password@/dbname?allowFallbackToPlaintext=PREFERRED", // wrong bool flag + "user:password@/dbname?allowFallbackToPlaintext=PREFERRED", // wrong bool flag + "user:password@/dbname?connectionAttributes=attr1:/unescaped/value", // unescaped //"/dbname?arg=/some/unescaped/path", } From b84ac5af9c77b13e4c6417e484cbed087cd672f3 Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Fri, 14 Mar 2025 22:05:00 +0900 Subject: [PATCH 19/20] go.mod: fix go version format (#1682) As of Go 1.21, toolchain versions must use the 1.N.P syntax. https://go.dev/doc/toolchain#version --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 33c4dd5b..187aff17 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/go-sql-driver/mysql -go 1.21 +go 1.21.0 require filippo.io/edwards25519 v1.1.0 From 1fbafa8082dab81e2c2e8caeb55d569dfeafcf94 Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Fri, 21 Mar 2025 11:04:43 +0900 Subject: [PATCH 20/20] release v1.9.1 (#1683) --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8c3aac1..a1b23c66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog +## v1.9.1 (2025-03-21) + +### Major Changes + +* Add Charset() option. (#1679) + +### Bugfixes + +* go.mod: fix go version format (#1682) +* Fix FormatDSN missing ConnectionAttributes (#1619) + ## v1.9.0 (2025-02-18) ### Major Changes