diff --git a/README.md b/README.md index 9742d3c73..08eae806e 100644 --- a/README.md +++ b/README.md @@ -132,7 +132,7 @@ type MyEventHandler struct { } func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { - log.Infof("%s %v\n", e.Action, e.Rows) + log.Debugf("%s %v\n", e.Action, e.Rows) return nil } diff --git a/canal/canal.go b/canal/canal.go index 4fd19a2b0..2e93a77a0 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -65,6 +65,10 @@ func NewCanal(cfg *Config) (*Canal, error) { c.ctx, c.cancel = context.WithCancel(context.Background()) + if cfg.WaitTimeBetweenConnectionSeconds > 0 { + cfg.WaitTimeBetweenConnectionSeconds = time.Duration(5) * time.Second + } + c.dumpDoneCh = make(chan struct{}) c.eventHandler = &DummyEventHandler{} c.parser = parser.New() @@ -192,6 +196,7 @@ func (c *Canal) RunFrom(pos mysql.Position) error { return c.Run() } +// Start from selected GTIDSet func (c *Canal) StartFromGTID(set mysql.GTIDSet) error { c.master.UpdateGTIDSet(set) @@ -238,15 +243,17 @@ func (c *Canal) run() error { } func (c *Canal) Close() { - log.Infof("closing canal") + log.Debugf("closing canal") c.m.Lock() defer c.m.Unlock() c.cancel() c.syncer.Close() c.connLock.Lock() - c.conn.Close() - c.conn = nil + if c.conn != nil { + c.conn.Close() + c.conn = nil + } c.connLock.Unlock() _ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true) @@ -413,20 +420,21 @@ func (c *Canal) checkBinlogRowFormat() error { func (c *Canal) prepareSyncer() error { cfg := replication.BinlogSyncerConfig{ - ServerID: c.cfg.ServerID, - Flavor: c.cfg.Flavor, - User: c.cfg.User, - Password: c.cfg.Password, - Charset: c.cfg.Charset, - HeartbeatPeriod: c.cfg.HeartbeatPeriod, - ReadTimeout: c.cfg.ReadTimeout, - UseDecimal: c.cfg.UseDecimal, - ParseTime: c.cfg.ParseTime, - SemiSyncEnabled: c.cfg.SemiSyncEnabled, - MaxReconnectAttempts: c.cfg.MaxReconnectAttempts, - DisableRetrySync: c.cfg.DisableRetrySync, - TimestampStringLocation: c.cfg.TimestampStringLocation, - TLSConfig: c.cfg.TLSConfig, + ServerID: c.cfg.ServerID, + Flavor: c.cfg.Flavor, + User: c.cfg.User, + Password: c.cfg.Password, + Charset: c.cfg.Charset, + HeartbeatPeriod: c.cfg.HeartbeatPeriod, + ReadTimeout: c.cfg.ReadTimeout, + UseDecimal: c.cfg.UseDecimal, + ParseTime: c.cfg.ParseTime, + SemiSyncEnabled: c.cfg.SemiSyncEnabled, + MaxReconnectAttempts: c.cfg.MaxReconnectAttempts, + DisableRetrySync: c.cfg.DisableRetrySync, + TimestampStringLocation: c.cfg.TimestampStringLocation, + TLSConfig: c.cfg.TLSConfig, + WaitTimeBetweenConnectionSeconds: c.cfg.WaitTimeBetweenConnectionSeconds, } if strings.Contains(c.cfg.Addr, "/") { diff --git a/canal/canal_test.go b/canal/canal_test.go index 7f4f4c7d7..e1c8ad7e3 100644 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -111,7 +111,7 @@ type testEventHandler struct { } func (h *testEventHandler) OnRow(e *RowsEvent) error { - log.Infof("OnRow %s %v\n", e.Action, e.Rows) + log.Debugf("OnRow %s %v\n", e.Action, e.Rows) umi, ok := e.Rows[0][4].(uint32) // 4th col is umi. mysqldump gives uint64 instead of uint32 if ok && (umi != umiA && umi != umiB && umi != umiC) { return fmt.Errorf("invalid unsigned medium int %d", umi) @@ -141,10 +141,48 @@ func (s *canalTestSuite) TestCanal(c *C) { s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`") s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18") - err := s.c.CatchMasterPos(10 * time.Second) + err := CatchMasterPos(s.c, 10*time.Second) c.Assert(err, IsNil) } +func CatchMasterPos(c *Canal, timeout time.Duration) error { + pos, err := c.GetMasterPos() + if err != nil { + return errors.Trace(err) + } + + return WaitUntilPos(c, pos, timeout) +} + +func FlushBinlog(c *Canal) error { + _, err := c.Execute("FLUSH BINARY LOGS") + return errors.Trace(err) +} + +func WaitUntilPos(c *Canal, pos mysql.Position, timeout time.Duration) error { + timer := time.NewTimer(timeout) + for { + select { + case <-timer.C: + return errors.Errorf("wait position %v too long > %s", pos, timeout) + default: + err := FlushBinlog(c) + if err != nil { + return errors.Trace(err) + } + curPos := c.master.Position() + if curPos.Compare(pos) >= 0 { + return nil + } else { + log.Debugf("master pos is %v, wait catching %v", curPos, pos) + time.Sleep(100 * time.Millisecond) + } + } + } + + return nil +} + func (s *canalTestSuite) TestCanalFilter(c *C) { // included sch, err := s.c.GetTable("test", "canal_test") diff --git a/canal/config.go b/canal/config.go index 5b97fae7a..c14d6d2d9 100644 --- a/canal/config.go +++ b/canal/config.go @@ -81,6 +81,9 @@ type Config struct { // this configuration will not work if DisableRetrySync is true MaxReconnectAttempts int `toml:"max_reconnect_attempts"` + // WaitTimeBetweenConnectionSeconds is the time to wait before retrying to connect. + WaitTimeBetweenConnectionSeconds time.Duration `toml:"max_wait_time_between_connection_seconds"` + // whether disable re-sync for broken connection DisableRetrySync bool `toml:"disable_retry_sync"` diff --git a/canal/dump.go b/canal/dump.go index 59af3a433..113be19b8 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -163,13 +163,13 @@ func (c *Canal) dump() error { if err != nil { return errors.Trace(err) } - log.Infof("skip master data, get current binlog position %v", pos) + log.Debugf("skip master data, get current binlog position %v", pos) h.name = pos.Name h.pos = uint64(pos.Pos) } start := time.Now() - log.Info("try dump MySQL and parse") + log.Debug("try dump MySQL and parse") if err := c.dumper.DumpAndParse(h); err != nil { return errors.Trace(err) } @@ -185,7 +185,7 @@ func (c *Canal) dump() error { c.master.UpdateGTIDSet(h.gset) startPos = h.gset } - log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s", + log.Debugf("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s", time.Now().Sub(start).Seconds(), startPos) return nil } @@ -196,12 +196,12 @@ func (c *Canal) tryDump() error { if (len(pos.Name) > 0 && pos.Pos > 0) || (gset != nil && gset.String() != "") { // we will sync with binlog name and position - log.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset) + log.Debugf("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset) return nil } if c.dumper == nil { - log.Info("skip dump, no mysqldump") + log.Debug("skip dump, no mysqldump") return nil } diff --git a/canal/sync.go b/canal/sync.go index e03d01267..231baa6eb 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -2,6 +2,7 @@ package canal import ( "fmt" + "strings" "sync/atomic" "time" @@ -22,7 +23,7 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { if err != nil { return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err) } - log.Infof("start sync binlog at binlog file %v", pos) + log.Debugf("start sync binlog at binlog file %v", pos) return s, nil } else { gsetClone := gset.Clone() @@ -30,7 +31,7 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { if err != nil { return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err) } - log.Infof("start sync binlog at GTID set %v", gsetClone) + log.Debugf("start sync binlog at GTID set %v", gsetClone) return s, nil } } @@ -65,7 +66,7 @@ func (c *Canal) runSyncBinlog() error { switch e := ev.Event.(type) { case *replication.RotateEvent: fakeRotateLogName = string(e.NextLogName) - log.Infof("received fake rotate event, next log name is %s", e.NextLogName) + log.Debugf("received fake rotate event, next log name is %s", e.NextLogName) } continue @@ -92,7 +93,7 @@ func (c *Canal) runSyncBinlog() error { case *replication.RotateEvent: pos.Name = string(e.NextLogName) pos.Pos = uint32(e.Position) - log.Infof("rotate binlog to %s", pos) + log.Debugf("rotate binlog to %s", pos) savePos = true force = true if err = c.eventHandler.OnRotate(e); err != nil { @@ -142,7 +143,15 @@ func (c *Canal) runSyncBinlog() error { case *replication.QueryEvent: stmts, _, err := c.parser.Parse(string(e.Query), "", "") if err != nil { - log.Errorf("parse query(%s) err %v, will skip this event", e.Query, err) + msg := err.Error() + if strings.Contains(strings.ToLower(msg), strings.ToLower("procedure")) { + // Cut the first row from the message since it contain the procedure call and not the entire message + fl := strings.Split(msg, "\n") + log.Debugf("parse SP Error: (%s)", fl[0]) + } else { + log.Debugf("parse query(%s) err %v", e.Query, err) + } + log.Debugln("will skip this event") continue } for _, stmt := range stmts { @@ -232,7 +241,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) { func (c *Canal) updateTable(db, table string) (err error) { c.ClearTableCache([]byte(db), []byte(table)) - log.Infof("table structure changed, clear table cache: %s.%s\n", db, table) + log.Debugf("table structure changed, clear table cache: %s.%s\n", db, table) if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist { return errors.Trace(err) } @@ -270,38 +279,10 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { return errors.Errorf("%s not supported now", e.Header.EventType) } events := newRowsEvent(t, action, ev.Rows, e.Header) + events.Header.Gtid = c.SyncedGTIDSet() return c.eventHandler.OnRow(events) } -func (c *Canal) FlushBinlog() error { - _, err := c.Execute("FLUSH BINARY LOGS") - return errors.Trace(err) -} - -func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error { - timer := time.NewTimer(timeout) - for { - select { - case <-timer.C: - return errors.Errorf("wait position %v too long > %s", pos, timeout) - default: - err := c.FlushBinlog() - if err != nil { - return errors.Trace(err) - } - curPos := c.master.Position() - if curPos.Compare(pos) >= 0 { - return nil - } else { - log.Debugf("master pos is %v, wait catching %v", curPos, pos) - time.Sleep(100 * time.Millisecond) - } - } - } - - return nil -} - func (c *Canal) GetMasterPos() (mysql.Position, error) { rr, err := c.Execute("SHOW MASTER STATUS") if err != nil { @@ -336,12 +317,3 @@ func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error) { } return gset, nil } - -func (c *Canal) CatchMasterPos(timeout time.Duration) error { - pos, err := c.GetMasterPos() - if err != nil { - return errors.Trace(err) - } - - return c.WaitUntilPos(pos, timeout) -} diff --git a/dump/dumper.go b/dump/dumper.go index 038624885..3fe1db89f 100644 --- a/dump/dumper.go +++ b/dump/dumper.go @@ -241,7 +241,7 @@ func (d *Dumper) Dump(w io.Writer) error { } args[passwordArgIndex] = "--password=******" - log.Infof("exec mysqldump with %v", args) + log.Debugf("exec mysqldump with %v", args) args[passwordArgIndex] = passwordArg cmd := exec.Command(d.ExecutionPath, args...) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 6d0111429..6521d11aa 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "os" + "strings" "sync" "time" @@ -18,7 +19,8 @@ import ( ) var ( - errSyncRunning = errors.New("Sync is running, must Close first") + errSyncRunning = errors.New("Sync is running, must Close first") + maxHostNameLength = 60 ) // BinlogSyncerConfig is the configuration for BinlogSyncer. @@ -28,6 +30,9 @@ type BinlogSyncerConfig struct { // Flavor is "mysql" or "mariadb", if not set, use "mysql" default. Flavor string + // MaxWaitTimeBetween Check connection + WaitTimeBetweenConnectionSeconds time.Duration + // Host is for MySQL server host. Host string // Port is for MySQL server port. @@ -145,7 +150,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { // Clear the Password to avoid outputing it in log. pass := cfg.Password cfg.Password = "" - log.Infof("create BinlogSyncer with config %v", cfg) + log.Debugf("create BinlogSyncer with config %v", cfg) cfg.Password = pass b := new(BinlogSyncer) @@ -158,6 +163,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation) b.parser.SetUseDecimal(b.cfg.UseDecimal) b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum) + b.parser.SetCharset(b.cfg.Charset) b.running = false b.ctx, b.cancel = context.WithCancel(context.Background()) @@ -177,7 +183,7 @@ func (b *BinlogSyncer) close() { return } - log.Info("syncer is closing...") + log.Debug("syncer is closing...") b.running = false b.cancel() @@ -207,7 +213,7 @@ func (b *BinlogSyncer) close() { b.c.Close() } - log.Info("syncer is closed") + log.Debug("syncer is closed") } func (b *BinlogSyncer) isClosed() bool { @@ -371,7 +377,7 @@ func (b *BinlogSyncer) GetNextPosition() Position { // StartSync starts syncing from the `pos` position. func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { - log.Infof("begin to sync binlog from position %s", pos) + log.Debugf("begin to sync binlog from position %s", pos) b.m.Lock() defer b.m.Unlock() @@ -389,7 +395,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { // StartSyncGTID starts syncing from the `gset` GTIDSet. func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { - log.Infof("begin to sync binlog from GTID set %s", gset) + log.Debugf("begin to sync binlog from GTID set %s", gset) b.prevGset = gset @@ -520,7 +526,10 @@ func (b *BinlogSyncer) writeRegisterSlaveCommand() error { b.c.ResetSequence() hostname := b.localHostname() - + if len(hostname) > maxHostNameLength { + runes := []rune(hostname) + hostname = string(runes[0:maxHostNameLength]) + } // This should be the name of slave host not the host we are connecting to. data := make([]byte, 4+1+4+1+len(hostname)+1+len(b.cfg.User)+1+len(b.cfg.Password)+2+4+4) pos := 4 @@ -593,13 +602,13 @@ func (b *BinlogSyncer) retrySync() error { if b.currGset != nil { msg = fmt.Sprintf("%v (last read GTID=%v)", msg, b.currGset) } - log.Infof(msg) + log.Debugf(msg) if err := b.prepareSyncGTID(b.prevGset); err != nil { return errors.Trace(err) } } else { - log.Infof("begin to re-sync from %s", b.nextPos) + log.Debugf("begin to re-sync from %s", b.nextPos) if err := b.prepareSyncPos(b.nextPos); err != nil { return errors.Trace(err) } @@ -684,6 +693,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { } for { + // Read packet until CTX is done if not will continue after the loop select { case <-b.ctx.Done(): s.close() @@ -697,7 +707,8 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { return } - log.Errorf("retry sync err: %v, wait 1s and retry again", err) + log.Errorf("retry sync err: %v, wait %s s and retry again", err, b.cfg.WaitTimeBetweenConnectionSeconds) + time.Sleep(b.cfg.WaitTimeBetweenConnectionSeconds) continue } } @@ -719,7 +730,11 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { switch data[0] { case OK_HEADER: - if err = b.parseEvent(s, data); err != nil { + if err = b.parseEvent(b.nextPos.Name, s, data); err != nil { + // if the error is errMissingTableMapEvent skip + if strings.Contains(strings.ToLower(errors.Cause(err).Error()), errMissingTableMapEvent.Error()) { + continue + } s.closeWithError(err) return } @@ -731,7 +746,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { // refer to https://dev.mysql.com/doc/internals/en/com-binlog-dump.html#binlog-dump-non-block // when COM_BINLOG_DUMP command use BINLOG_DUMP_NON_BLOCK flag, // if there is no more event to send an EOF_Packet instead of blocking the connection - log.Info("receive EOF packet, no more binlog event now.") + log.Debug("receive EOF packet, no more binlog event now.") continue default: log.Errorf("invalid stream header %c", data[0]) @@ -740,7 +755,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { } } -func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { +func (b *BinlogSyncer) parseEvent(fileName string, s *BinlogStreamer, data []byte) error { //skip OK byte, 0x00 data = data[1:] @@ -751,7 +766,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { data = data[2:] } - e, err := b.parser.Parse(data) + e, err := b.parser.Parse(fileName, data) if err != nil { return errors.Trace(err) } @@ -787,7 +802,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { case *RotateEvent: b.nextPos.Name = string(event.NextLogName) b.nextPos.Pos = uint32(event.Position) - log.Infof("rotate to %s", b.nextPos) + log.Debugf("rotate to %s", b.nextPos) case *GTIDEvent: if b.prevGset == nil { break @@ -860,5 +875,5 @@ func (b *BinlogSyncer) killConnection(conn *client.Conn, id uint32) { log.Error(errors.Trace(err)) } } - log.Infof("kill last connection id %d", id) + log.Debugf("kill last connection id %d", id) } diff --git a/replication/event.go b/replication/event.go index 9111691d2..56a5e0d7a 100644 --- a/replication/event.go +++ b/replication/event.go @@ -66,6 +66,8 @@ type EventHeader struct { EventSize uint32 LogPos uint32 Flags uint16 + FileName string + Gtid GTIDSet } func (h *EventHeader) Decode(data []byte) error { diff --git a/replication/parser.go b/replication/parser.go index 7688fe7b0..cee6b060c 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -22,7 +22,8 @@ var ( type BinlogParser struct { // "mysql" or "mariadb", if not set, use "mysql" by default - flavor string + flavor string + charset string format *FormatDescriptionEvent @@ -86,7 +87,7 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) return errors.Errorf("seek %s to %d error %v", name, offset, err) } - if err = p.parseFormatDescriptionEvent(f, onEvent); err != nil { + if err = p.parseFormatDescriptionEvent(f, name, onEvent); err != nil { return errors.Annotatef(err, "parse FormatDescriptionEvent") } } @@ -95,20 +96,20 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) return errors.Errorf("seek %s to %d error %v", name, offset, err) } - return p.ParseReader(f, onEvent) + return p.ParseReader(f, name, onEvent) } -func (p *BinlogParser) parseFormatDescriptionEvent(r io.Reader, onEvent OnEventFunc) error { - _, err := p.parseSingleEvent(r, onEvent) +func (p *BinlogParser) parseFormatDescriptionEvent(r io.Reader, fileName string, onEvent OnEventFunc) error { + _, err := p.parseSingleEvent(r, fileName, onEvent) return err } // ParseSingleEvent parses single binlog event and passes the event to onEvent function. -func (p *BinlogParser) ParseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, error) { - return p.parseSingleEvent(r, onEvent) +func (p *BinlogParser) ParseSingleEvent(r io.Reader, fileName string, onEvent OnEventFunc) (bool, error) { + return p.parseSingleEvent(r, fileName, onEvent) } -func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, error) { +func (p *BinlogParser) parseSingleEvent(r io.Reader, fileName string, onEvent OnEventFunc) (bool, error) { var err error var n int64 @@ -123,7 +124,7 @@ func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, } var h *EventHeader - h, err = p.parseHeader(buf.Bytes()) + h, err = p.parseHeader(fileName, buf.Bytes()) if err != nil { return false, errors.Trace(err) } @@ -162,13 +163,13 @@ func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, return false, nil } -func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error { +func (p *BinlogParser) ParseReader(r io.Reader, name string, onEvent OnEventFunc) error { for { if atomic.LoadUint32(&p.stopProcessing) == 1 { break } - done, err := p.parseSingleEvent(r, onEvent) + done, err := p.parseSingleEvent(r, name, onEvent) if err != nil { if err == errMissingTableMapEvent { continue @@ -212,13 +213,18 @@ func (p *BinlogParser) SetFlavor(flavor string) { p.flavor = flavor } -func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) { +func (p *BinlogParser) SetCharset(charset string) { + p.charset = charset +} + +func (p *BinlogParser) parseHeader(fileName string, data []byte) (*EventHeader, error) { h := new(EventHeader) err := h.Decode(data) if err != nil { return nil, err } + h.FileName = fileName return h, nil } @@ -302,6 +308,7 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( } if te, ok := e.(*TableMapEvent); ok { + te.charset = p.charset p.tables[te.TableID] = te } @@ -321,10 +328,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( // into the parser for this to work properly on any given event. // Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace // an existing one. -func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) { +func (p *BinlogParser) Parse(fileName string, data []byte) (*BinlogEvent, error) { rawData := data - h, err := p.parseHeader(data) + h, err := p.parseHeader(fileName, data) if err != nil { return nil, err diff --git a/replication/parser_test.go b/replication/parser_test.go index 2f957198c..f81c7908a 100644 --- a/replication/parser_test.go +++ b/replication/parser_test.go @@ -31,7 +31,7 @@ func (t *testSyncerSuite) TestIndexOutOfRange(c *C) { 0x3065f: {tableIDSize: 6, TableID: 0x3065f, Flags: 0x1, Schema: []uint8{0x73, 0x65, 0x69, 0x75, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72}, Table: []uint8{0x63, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x6f, 0x75, 0x74, 0x5f, 0x6c, 0x65, 0x74, 0x74, 0x65, 0x72}, ColumnCount: 0xd, ColumnType: []uint8{0x3, 0x3, 0x3, 0x3, 0x1, 0x12, 0xf, 0xf, 0x12, 0xf, 0xf, 0x3, 0xf}, ColumnMeta: []uint16{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x180, 0x180, 0x0, 0x180, 0x180, 0x0, 0x2fd}, NullBitmap: []uint8{0xe0, 0x17}}, } - _, err := parser.Parse([]byte{ + _, err := parser.Parse("test", []byte{ /* 0x00, */ 0xc1, 0x86, 0x8e, 0x55, 0x1e, 0xa5, 0x14, 0x80, 0xa, 0x55, 0x0, 0x0, 0x0, 0x7, 0xc, 0xbf, 0xe, 0x0, 0x0, 0x5f, 0x6, 0x3, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0xd, 0xff, 0x0, 0x0, 0x19, 0x63, 0x7, 0x0, 0xca, 0x61, 0x5, 0x0, 0x5e, 0xf7, 0xc, 0x0, 0xf5, 0x7, @@ -63,14 +63,14 @@ func (t *testSyncerSuite) TestParseEvent(c *C) { for _, tc := range testCases { r := bytes.NewReader(tc.byteData) - _, err := parser.ParseSingleEvent(r, func(e *BinlogEvent) error { + _, err := parser.ParseSingleEvent(r, "test", func(e *BinlogEvent) error { c.Assert(e.Header.EventType, Equals, STOP_EVENT) c.Assert(e.Header.EventSize, Equals, tc.eventSize) return nil }) c.Assert(err, IsNil) - e, err2 := parser.Parse(tc.byteData) + e, err2 := parser.Parse("test", tc.byteData) c.Assert(e.Header.EventType, Equals, STOP_EVENT) c.Assert(e.Header.EventSize, Equals, tc.eventSize) c.Assert(err2, IsNil) diff --git a/replication/row_event.go b/replication/row_event.go index 1729aefb7..85c61c834 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -4,10 +4,13 @@ import ( "encoding/binary" "encoding/hex" "fmt" + "golang.org/x/text/encoding/charmap" + "golang.org/x/text/transform" "io" "strconv" "strings" "time" + "unicode/utf8" "github.com/pingcap/errors" "github.com/shopspring/decimal" @@ -22,6 +25,7 @@ var errMissingTableMapEvent = errors.New("invalid table id, no corresponding tab type TableMapEvent struct { flavor string tableIDSize int + charset string TableID uint64 @@ -1059,7 +1063,7 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ n = 4 t := binary.LittleEndian.Uint32(data) if t == 0 { - v = formatZeroTime(0, 0) + v = nil } else { v = e.parseFracTime(fracTime{ Time: time.Unix(int64(t), 0), @@ -1074,7 +1078,7 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ n = 8 i64 := binary.LittleEndian.Uint64(data) if i64 == 0 { - v = formatZeroTime(0, 0) + v = nil } else { d := i64 / 1000000 t := i64 % 1000000 @@ -1109,7 +1113,7 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ n = 3 i32 := uint32(FixedLengthInt(data[0:3])) if i32 == 0 { - v = "0000-00-00" + v = nil } else { v = fmt.Sprintf("%04d-%02d-%02d", i32/(16*32), i32/32%16, i32%32) } @@ -1141,17 +1145,25 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ v, err = littleDecodeBit(data, nbits, n) case MYSQL_TYPE_BLOB: v, n, err = decodeBlob(data, meta) + newValue, ok := convertToString(v) + if ok { + v = newValue + } case MYSQL_TYPE_VARCHAR, MYSQL_TYPE_VAR_STRING: length = int(meta) - v, n = decodeString(data, length) + v, n = decodeByCharSet(data, e.Table.charset, length) case MYSQL_TYPE_STRING: - v, n = decodeString(data, length) + v, n = decodeByCharSet(data, e.Table.charset, length) case MYSQL_TYPE_JSON: // Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404 length = int(FixedLengthInt(data[0:meta])) n = length + int(meta) v, err = e.decodeJsonBinary(data[meta:n]) + newValue, ok := convertToString(v) + if ok { + v = newValue + } case MYSQL_TYPE_GEOMETRY: // MySQL saves Geometry as Blob in binlog // Seem that the binary format is SRID (4 bytes) + WKB, outer can use @@ -1167,6 +1179,31 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ return v, n, err } +// convertToString receive an interface and convert it to string if match the desired type +func convertToString(s interface{}) (string, bool) { + if s == nil { + return "", false + } + switch v := s.(type) { + case []uint8: + str := string(v) + return str, true + default: + return "", false + } +} + +func decodeByCharSet(data []byte, charset string, length int) (v string, n int) { + if !utf8.Valid(data) { + if charset == "latin1" { + return decodeStringLatin1(data, length) + } else { + fmt.Printf("Data is not in utf8 or latin1") + } + } + return decodeString(data, length) +} + func decodeString(data []byte, length int) (v string, n int) { if length < 256 { length = int(data[0]) @@ -1182,6 +1219,29 @@ func decodeString(data []byte, length int) (v string, n int) { return } +func decodeStringLatin1(data []byte, length int) (v string, n int) { + // Define the Latin1 decoder + decoder := charmap.ISO8859_1.NewDecoder() + + if length < 256 { + // If the length is smaller than 256, extract the length from the first byte + length = int(data[0]) + n = length + 1 + // Use the decoder to convert to a string with Latin1 encoding + decodedBytes, _, _ := transform.Bytes(decoder, data[1:n]) + v = string(decodedBytes) + } else { + // If the length is larger, extract it using LittleEndian + length = int(binary.LittleEndian.Uint16(data[0:])) + n = length + 2 + // Use the decoder to convert to a string with Latin1 encoding + decodedBytes, _, _ := transform.Bytes(decoder, data[2:n]) + v = string(decodedBytes) + } + + return +} + // ref: https://github.com/mysql/mysql-server/blob/a9b0c712de3509d8d08d3ba385d41a4df6348775/strings/decimal.c#L137 const digitsPerInteger int = 9 @@ -1373,7 +1433,7 @@ func decodeTimestamp2(data []byte, dec uint16, timestampStringLocation *time.Loc } if sec == 0 { - return formatZeroTime(int(usec), int(dec)), n, nil + return nil, n, nil } return fracTime{ @@ -1402,7 +1462,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) { } if intPart == 0 { - return formatZeroTime(int(frac), int(dec)), n, nil + return nil, n, nil } tmp := intPart<<24 + frac diff --git a/replication/row_event_test.go b/replication/row_event_test.go index d81277ff6..99907e7be 100644 --- a/replication/row_event_test.go +++ b/replication/row_event_test.go @@ -1,7 +1,10 @@ package replication import ( + "bytes" + "encoding/binary" "fmt" + "testing" . "github.com/pingcap/check" "github.com/shopspring/decimal" @@ -1341,3 +1344,100 @@ func (_ *testDecodeSuite) BenchmarkInt(c *C) { } } } + +func TestDecodeStringLatin1(t *testing.T) { + tests := []struct { + name string + input []byte + length int + wantStr string + wantRead int + }{ + { + name: "Short Latin1 string", + input: append([]byte{5}, []byte{0xe2, 'f', 'g', 'h', 0xe9}...), // length byte + 5 bytes: 'âfghé' + length: 5, + wantStr: "âfghé", + wantRead: 6, + }, + { + name: "Empty string", + input: []byte{0}, + length: 0, + wantStr: "", + wantRead: 1, + }, + { + name: "Long string (>255, 2-byte length)", + input: func() []byte { + buf := new(bytes.Buffer) + binary.Write(buf, binary.LittleEndian, uint16(5)) + buf.Write([]byte{0xe2, 'f', 'g', 'h', 0xe9}) // 'âfghé' + return buf.Bytes() + }(), + length: 300, + wantStr: "âfghé", + wantRead: 7, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotStr, gotRead := decodeStringLatin1(tt.input, tt.length) + if gotStr != tt.wantStr { + t.Errorf("decodeStringLatin1() got string = %q, want %q", gotStr, tt.wantStr) + } + if gotRead != tt.wantRead { + t.Errorf("decodeStringLatin1() got read bytes = %d, want %d", gotRead, tt.wantRead) + } + }) + } +} + +func TestDecodeByCharSet(t *testing.T) { + tests := []struct { + name string + input []byte + charset string + length int + wantStr string + }{ + { + name: "Valid UTF-8 with UTF-8 charset", + input: append([]byte{11}, []byte("hello world")...), + charset: "utf8", + length: 11, + wantStr: "hello world", + }, + { + name: "Invalid UTF-8 but charset is latin1", + input: append([]byte{3}, []byte{0xe2, 0x28, 0xa1}...), // invalid utf8 + charset: "latin1", + length: 3, + wantStr: "â(¡", + }, + { + name: "Invalid UTF-8 but charset is utf8", + input: append([]byte{3}, []byte{0xe2, 0x28, 0xa1}...), // invalid utf8 + charset: "utf8", + length: 3, + wantStr: "\xe2(\xa1", + }, + { + name: "Valid UTF-8 and charset latin1", + input: append([]byte{11}, []byte("hello world")...), + charset: "latin1", + length: 11, + wantStr: "hello world", // because utf8.Valid is true, decodeString should be used + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotStr, _ := decodeByCharSet(tt.input, tt.charset, tt.length) + if gotStr != tt.wantStr { + t.Errorf("decodeByCharSet() = %q, want %q", gotStr, tt.wantStr) + } + }) + } +}