From ef55c4cc8984efaff6f56756043324a9d7831b4d Mon Sep 17 00:00:00 2001 From: "DESKTOP-E2OL3QR\\Eitam" Date: Thu, 6 Jan 2022 15:02:20 +0200 Subject: [PATCH 01/27] Added waitTimeBetweenConnection check --- README.md | 2 +- canal/canal.go | 36 +++++++++++++++++++++--------------- canal/canal_test.go | 2 +- canal/config.go | 3 +++ canal/dump.go | 10 +++++----- canal/sync.go | 10 +++++----- dump/dumper.go | 2 +- replication/binlogsyncer.go | 27 ++++++++++++++++----------- 8 files changed, 53 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 9742d3c73..08eae806e 100644 --- a/README.md +++ b/README.md @@ -132,7 +132,7 @@ type MyEventHandler struct { } func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { - log.Infof("%s %v\n", e.Action, e.Rows) + log.Debugf("%s %v\n", e.Action, e.Rows) return nil } diff --git a/canal/canal.go b/canal/canal.go index 4fd19a2b0..86dd13bee 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -65,6 +65,10 @@ func NewCanal(cfg *Config) (*Canal, error) { c.ctx, c.cancel = context.WithCancel(context.Background()) + if cfg.WaitTimeBetweenConnectionSeconds > 0 { + cfg.WaitTimeBetweenConnectionSeconds = time.Duration(5) * time.Second + } + c.dumpDoneCh = make(chan struct{}) c.eventHandler = &DummyEventHandler{} c.parser = parser.New() @@ -192,6 +196,7 @@ func (c *Canal) RunFrom(pos mysql.Position) error { return c.Run() } +// Start from selected GTIDSet func (c *Canal) StartFromGTID(set mysql.GTIDSet) error { c.master.UpdateGTIDSet(set) @@ -238,7 +243,7 @@ func (c *Canal) run() error { } func (c *Canal) Close() { - log.Infof("closing canal") + log.Debugf("closing canal") c.m.Lock() defer c.m.Unlock() @@ -413,20 +418,21 @@ func (c *Canal) checkBinlogRowFormat() error { func (c *Canal) prepareSyncer() error { cfg := replication.BinlogSyncerConfig{ - ServerID: c.cfg.ServerID, - Flavor: c.cfg.Flavor, - User: c.cfg.User, - Password: c.cfg.Password, - Charset: c.cfg.Charset, - HeartbeatPeriod: c.cfg.HeartbeatPeriod, - ReadTimeout: c.cfg.ReadTimeout, - UseDecimal: c.cfg.UseDecimal, - ParseTime: c.cfg.ParseTime, - SemiSyncEnabled: c.cfg.SemiSyncEnabled, - MaxReconnectAttempts: c.cfg.MaxReconnectAttempts, - DisableRetrySync: c.cfg.DisableRetrySync, - TimestampStringLocation: c.cfg.TimestampStringLocation, - TLSConfig: c.cfg.TLSConfig, + ServerID: c.cfg.ServerID, + Flavor: c.cfg.Flavor, + User: c.cfg.User, + Password: c.cfg.Password, + Charset: c.cfg.Charset, + HeartbeatPeriod: c.cfg.HeartbeatPeriod, + ReadTimeout: c.cfg.ReadTimeout, + UseDecimal: c.cfg.UseDecimal, + ParseTime: c.cfg.ParseTime, + SemiSyncEnabled: c.cfg.SemiSyncEnabled, + MaxReconnectAttempts: c.cfg.MaxReconnectAttempts, + DisableRetrySync: c.cfg.DisableRetrySync, + TimestampStringLocation: c.cfg.TimestampStringLocation, + TLSConfig: c.cfg.TLSConfig, + WaitTimeBetweenConnectionSeconds: c.cfg.WaitTimeBetweenConnectionSeconds, } if strings.Contains(c.cfg.Addr, "/") { diff --git a/canal/canal_test.go b/canal/canal_test.go index 7f4f4c7d7..a5bb6ed92 100644 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -111,7 +111,7 @@ type testEventHandler struct { } func (h *testEventHandler) OnRow(e *RowsEvent) error { - log.Infof("OnRow %s %v\n", e.Action, e.Rows) + log.Debugf("OnRow %s %v\n", e.Action, e.Rows) umi, ok := e.Rows[0][4].(uint32) // 4th col is umi. mysqldump gives uint64 instead of uint32 if ok && (umi != umiA && umi != umiB && umi != umiC) { return fmt.Errorf("invalid unsigned medium int %d", umi) diff --git a/canal/config.go b/canal/config.go index 5b97fae7a..c14d6d2d9 100644 --- a/canal/config.go +++ b/canal/config.go @@ -81,6 +81,9 @@ type Config struct { // this configuration will not work if DisableRetrySync is true MaxReconnectAttempts int `toml:"max_reconnect_attempts"` + // WaitTimeBetweenConnectionSeconds is the time to wait before retrying to connect. + WaitTimeBetweenConnectionSeconds time.Duration `toml:"max_wait_time_between_connection_seconds"` + // whether disable re-sync for broken connection DisableRetrySync bool `toml:"disable_retry_sync"` diff --git a/canal/dump.go b/canal/dump.go index 59af3a433..113be19b8 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -163,13 +163,13 @@ func (c *Canal) dump() error { if err != nil { return errors.Trace(err) } - log.Infof("skip master data, get current binlog position %v", pos) + log.Debugf("skip master data, get current binlog position %v", pos) h.name = pos.Name h.pos = uint64(pos.Pos) } start := time.Now() - log.Info("try dump MySQL and parse") + log.Debug("try dump MySQL and parse") if err := c.dumper.DumpAndParse(h); err != nil { return errors.Trace(err) } @@ -185,7 +185,7 @@ func (c *Canal) dump() error { c.master.UpdateGTIDSet(h.gset) startPos = h.gset } - log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s", + log.Debugf("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s", time.Now().Sub(start).Seconds(), startPos) return nil } @@ -196,12 +196,12 @@ func (c *Canal) tryDump() error { if (len(pos.Name) > 0 && pos.Pos > 0) || (gset != nil && gset.String() != "") { // we will sync with binlog name and position - log.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset) + log.Debugf("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset) return nil } if c.dumper == nil { - log.Info("skip dump, no mysqldump") + log.Debug("skip dump, no mysqldump") return nil } diff --git a/canal/sync.go b/canal/sync.go index e03d01267..94bd3f6da 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -22,7 +22,7 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { if err != nil { return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err) } - log.Infof("start sync binlog at binlog file %v", pos) + log.Debugf("start sync binlog at binlog file %v", pos) return s, nil } else { gsetClone := gset.Clone() @@ -30,7 +30,7 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { if err != nil { return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err) } - log.Infof("start sync binlog at GTID set %v", gsetClone) + log.Debugf("start sync binlog at GTID set %v", gsetClone) return s, nil } } @@ -65,7 +65,7 @@ func (c *Canal) runSyncBinlog() error { switch e := ev.Event.(type) { case *replication.RotateEvent: fakeRotateLogName = string(e.NextLogName) - log.Infof("received fake rotate event, next log name is %s", e.NextLogName) + log.Debugf("received fake rotate event, next log name is %s", e.NextLogName) } continue @@ -92,7 +92,7 @@ func (c *Canal) runSyncBinlog() error { case *replication.RotateEvent: pos.Name = string(e.NextLogName) pos.Pos = uint32(e.Position) - log.Infof("rotate binlog to %s", pos) + log.Debugf("rotate binlog to %s", pos) savePos = true force = true if err = c.eventHandler.OnRotate(e); err != nil { @@ -232,7 +232,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) { func (c *Canal) updateTable(db, table string) (err error) { c.ClearTableCache([]byte(db), []byte(table)) - log.Infof("table structure changed, clear table cache: %s.%s\n", db, table) + log.Debugf("table structure changed, clear table cache: %s.%s\n", db, table) if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist { return errors.Trace(err) } diff --git a/dump/dumper.go b/dump/dumper.go index 038624885..3fe1db89f 100644 --- a/dump/dumper.go +++ b/dump/dumper.go @@ -241,7 +241,7 @@ func (d *Dumper) Dump(w io.Writer) error { } args[passwordArgIndex] = "--password=******" - log.Infof("exec mysqldump with %v", args) + log.Debugf("exec mysqldump with %v", args) args[passwordArgIndex] = passwordArg cmd := exec.Command(d.ExecutionPath, args...) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 6d0111429..10b25b04b 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -28,6 +28,9 @@ type BinlogSyncerConfig struct { // Flavor is "mysql" or "mariadb", if not set, use "mysql" default. Flavor string + // MaxWaitTimeBetween Check connection + WaitTimeBetweenConnectionSeconds time.Duration + // Host is for MySQL server host. Host string // Port is for MySQL server port. @@ -145,7 +148,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { // Clear the Password to avoid outputing it in log. pass := cfg.Password cfg.Password = "" - log.Infof("create BinlogSyncer with config %v", cfg) + log.Debugf("create BinlogSyncer with config %v", cfg) cfg.Password = pass b := new(BinlogSyncer) @@ -177,7 +180,7 @@ func (b *BinlogSyncer) close() { return } - log.Info("syncer is closing...") + log.Debug("syncer is closing...") b.running = false b.cancel() @@ -207,7 +210,7 @@ func (b *BinlogSyncer) close() { b.c.Close() } - log.Info("syncer is closed") + log.Debug("syncer is closed") } func (b *BinlogSyncer) isClosed() bool { @@ -371,7 +374,7 @@ func (b *BinlogSyncer) GetNextPosition() Position { // StartSync starts syncing from the `pos` position. func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { - log.Infof("begin to sync binlog from position %s", pos) + log.Debugf("begin to sync binlog from position %s", pos) b.m.Lock() defer b.m.Unlock() @@ -389,7 +392,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { // StartSyncGTID starts syncing from the `gset` GTIDSet. func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { - log.Infof("begin to sync binlog from GTID set %s", gset) + log.Debugf("begin to sync binlog from GTID set %s", gset) b.prevGset = gset @@ -593,13 +596,13 @@ func (b *BinlogSyncer) retrySync() error { if b.currGset != nil { msg = fmt.Sprintf("%v (last read GTID=%v)", msg, b.currGset) } - log.Infof(msg) + log.Debugf(msg) if err := b.prepareSyncGTID(b.prevGset); err != nil { return errors.Trace(err) } } else { - log.Infof("begin to re-sync from %s", b.nextPos) + log.Debugf("begin to re-sync from %s", b.nextPos) if err := b.prepareSyncPos(b.nextPos); err != nil { return errors.Trace(err) } @@ -684,6 +687,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { } for { + // Read packet until CTX is done if not will continue after the loop select { case <-b.ctx.Done(): s.close() @@ -697,7 +701,8 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { return } - log.Errorf("retry sync err: %v, wait 1s and retry again", err) + log.Errorf("retry sync err: %v, wait %s s and retry again", err, b.cfg.WaitTimeBetweenConnectionSeconds) + time.Sleep(b.cfg.WaitTimeBetweenConnectionSeconds) continue } } @@ -731,7 +736,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { // refer to https://dev.mysql.com/doc/internals/en/com-binlog-dump.html#binlog-dump-non-block // when COM_BINLOG_DUMP command use BINLOG_DUMP_NON_BLOCK flag, // if there is no more event to send an EOF_Packet instead of blocking the connection - log.Info("receive EOF packet, no more binlog event now.") + log.Debug("receive EOF packet, no more binlog event now.") continue default: log.Errorf("invalid stream header %c", data[0]) @@ -787,7 +792,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { case *RotateEvent: b.nextPos.Name = string(event.NextLogName) b.nextPos.Pos = uint32(event.Position) - log.Infof("rotate to %s", b.nextPos) + log.Debugf("rotate to %s", b.nextPos) case *GTIDEvent: if b.prevGset == nil { break @@ -860,5 +865,5 @@ func (b *BinlogSyncer) killConnection(conn *client.Conn, id uint32) { log.Error(errors.Trace(err)) } } - log.Infof("kill last connection id %d", id) + log.Debugf("kill last connection id %d", id) } From 6855741c9e456fc3484d59f47caa3ebf25965286 Mon Sep 17 00:00:00 2001 From: "DESKTOP-E2OL3QR\\Eitam" Date: Thu, 6 Jan 2022 15:03:34 +0200 Subject: [PATCH 02/27] Added safe conn close --- canal/canal.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index 86dd13bee..2e93a77a0 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -250,8 +250,10 @@ func (c *Canal) Close() { c.cancel() c.syncer.Close() c.connLock.Lock() - c.conn.Close() - c.conn = nil + if c.conn != nil { + c.conn.Close() + c.conn = nil + } c.connLock.Unlock() _ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true) From e6565cd6b6d87c5774d0071aed467fc3fabb39f9 Mon Sep 17 00:00:00 2001 From: "DESKTOP-E2OL3QR\\Eitam" Date: Thu, 6 Jan 2022 15:07:18 +0200 Subject: [PATCH 03/27] Added file name to each event --- replication/binlogsyncer.go | 6 +++--- replication/event.go | 1 + replication/parser.go | 27 ++++++++++++++------------- replication/parser_test.go | 6 +++--- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 10b25b04b..1c58773ac 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -724,7 +724,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { switch data[0] { case OK_HEADER: - if err = b.parseEvent(s, data); err != nil { + if err = b.parseEvent(b.nextPos.Name, s, data); err != nil { s.closeWithError(err) return } @@ -745,7 +745,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { } } -func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { +func (b *BinlogSyncer) parseEvent(fileName string, s *BinlogStreamer, data []byte) error { //skip OK byte, 0x00 data = data[1:] @@ -756,7 +756,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { data = data[2:] } - e, err := b.parser.Parse(data) + e, err := b.parser.Parse(fileName, data) if err != nil { return errors.Trace(err) } diff --git a/replication/event.go b/replication/event.go index 9111691d2..40208b3cb 100644 --- a/replication/event.go +++ b/replication/event.go @@ -66,6 +66,7 @@ type EventHeader struct { EventSize uint32 LogPos uint32 Flags uint16 + FileName string } func (h *EventHeader) Decode(data []byte) error { diff --git a/replication/parser.go b/replication/parser.go index 7688fe7b0..21bcdb86b 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -86,7 +86,7 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) return errors.Errorf("seek %s to %d error %v", name, offset, err) } - if err = p.parseFormatDescriptionEvent(f, onEvent); err != nil { + if err = p.parseFormatDescriptionEvent(f, name, onEvent); err != nil { return errors.Annotatef(err, "parse FormatDescriptionEvent") } } @@ -95,20 +95,20 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) return errors.Errorf("seek %s to %d error %v", name, offset, err) } - return p.ParseReader(f, onEvent) + return p.ParseReader(f, name, onEvent) } -func (p *BinlogParser) parseFormatDescriptionEvent(r io.Reader, onEvent OnEventFunc) error { - _, err := p.parseSingleEvent(r, onEvent) +func (p *BinlogParser) parseFormatDescriptionEvent(r io.Reader, fileName string, onEvent OnEventFunc) error { + _, err := p.parseSingleEvent(r, fileName, onEvent) return err } // ParseSingleEvent parses single binlog event and passes the event to onEvent function. -func (p *BinlogParser) ParseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, error) { - return p.parseSingleEvent(r, onEvent) +func (p *BinlogParser) ParseSingleEvent(r io.Reader, fileName string, onEvent OnEventFunc) (bool, error) { + return p.parseSingleEvent(r, fileName, onEvent) } -func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, error) { +func (p *BinlogParser) parseSingleEvent(r io.Reader, fileName string, onEvent OnEventFunc) (bool, error) { var err error var n int64 @@ -123,7 +123,7 @@ func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, } var h *EventHeader - h, err = p.parseHeader(buf.Bytes()) + h, err = p.parseHeader(fileName, buf.Bytes()) if err != nil { return false, errors.Trace(err) } @@ -162,13 +162,13 @@ func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, return false, nil } -func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error { +func (p *BinlogParser) ParseReader(r io.Reader, name string, onEvent OnEventFunc) error { for { if atomic.LoadUint32(&p.stopProcessing) == 1 { break } - done, err := p.parseSingleEvent(r, onEvent) + done, err := p.parseSingleEvent(r, name, onEvent) if err != nil { if err == errMissingTableMapEvent { continue @@ -212,13 +212,14 @@ func (p *BinlogParser) SetFlavor(flavor string) { p.flavor = flavor } -func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) { +func (p *BinlogParser) parseHeader(fileName string, data []byte) (*EventHeader, error) { h := new(EventHeader) err := h.Decode(data) if err != nil { return nil, err } + h.FileName = fileName return h, nil } @@ -321,10 +322,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( // into the parser for this to work properly on any given event. // Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace // an existing one. -func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) { +func (p *BinlogParser) Parse(fileName string, data []byte) (*BinlogEvent, error) { rawData := data - h, err := p.parseHeader(data) + h, err := p.parseHeader(fileName, data) if err != nil { return nil, err diff --git a/replication/parser_test.go b/replication/parser_test.go index 2f957198c..f81c7908a 100644 --- a/replication/parser_test.go +++ b/replication/parser_test.go @@ -31,7 +31,7 @@ func (t *testSyncerSuite) TestIndexOutOfRange(c *C) { 0x3065f: {tableIDSize: 6, TableID: 0x3065f, Flags: 0x1, Schema: []uint8{0x73, 0x65, 0x69, 0x75, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72}, Table: []uint8{0x63, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x6f, 0x75, 0x74, 0x5f, 0x6c, 0x65, 0x74, 0x74, 0x65, 0x72}, ColumnCount: 0xd, ColumnType: []uint8{0x3, 0x3, 0x3, 0x3, 0x1, 0x12, 0xf, 0xf, 0x12, 0xf, 0xf, 0x3, 0xf}, ColumnMeta: []uint16{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x180, 0x180, 0x0, 0x180, 0x180, 0x0, 0x2fd}, NullBitmap: []uint8{0xe0, 0x17}}, } - _, err := parser.Parse([]byte{ + _, err := parser.Parse("test", []byte{ /* 0x00, */ 0xc1, 0x86, 0x8e, 0x55, 0x1e, 0xa5, 0x14, 0x80, 0xa, 0x55, 0x0, 0x0, 0x0, 0x7, 0xc, 0xbf, 0xe, 0x0, 0x0, 0x5f, 0x6, 0x3, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0xd, 0xff, 0x0, 0x0, 0x19, 0x63, 0x7, 0x0, 0xca, 0x61, 0x5, 0x0, 0x5e, 0xf7, 0xc, 0x0, 0xf5, 0x7, @@ -63,14 +63,14 @@ func (t *testSyncerSuite) TestParseEvent(c *C) { for _, tc := range testCases { r := bytes.NewReader(tc.byteData) - _, err := parser.ParseSingleEvent(r, func(e *BinlogEvent) error { + _, err := parser.ParseSingleEvent(r, "test", func(e *BinlogEvent) error { c.Assert(e.Header.EventType, Equals, STOP_EVENT) c.Assert(e.Header.EventSize, Equals, tc.eventSize) return nil }) c.Assert(err, IsNil) - e, err2 := parser.Parse(tc.byteData) + e, err2 := parser.Parse("test", tc.byteData) c.Assert(e.Header.EventType, Equals, STOP_EVENT) c.Assert(e.Header.EventSize, Equals, tc.eventSize) c.Assert(err2, IsNil) From cb4f084de1b27dc965202c3ae37d924a088b5174 Mon Sep 17 00:00:00 2001 From: "DESKTOP-E2OL3QR\\Eitam" Date: Thu, 6 Jan 2022 15:07:59 +0200 Subject: [PATCH 04/27] Added gtid to each event --- canal/sync.go | 1 + replication/event.go | 1 + 2 files changed, 2 insertions(+) diff --git a/canal/sync.go b/canal/sync.go index 94bd3f6da..9a8df4b96 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -270,6 +270,7 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { return errors.Errorf("%s not supported now", e.Header.EventType) } events := newRowsEvent(t, action, ev.Rows, e.Header) + events.Header.Gtid = c.SyncedGTIDSet() return c.eventHandler.OnRow(events) } diff --git a/replication/event.go b/replication/event.go index 40208b3cb..56a5e0d7a 100644 --- a/replication/event.go +++ b/replication/event.go @@ -67,6 +67,7 @@ type EventHeader struct { LogPos uint32 Flags uint16 FileName string + Gtid GTIDSet } func (h *EventHeader) Decode(data []byte) error { From 8d7d13c40e4f1a2e9aa6ad06f39895be20936bb0 Mon Sep 17 00:00:00 2001 From: "DESKTOP-E2OL3QR\\Eitam" Date: Sun, 16 Jan 2022 11:39:23 +0200 Subject: [PATCH 05/27] Removed FlushBinlog which was only used for testing --- canal/sync.go | 38 -------------------------------------- 1 file changed, 38 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index 9a8df4b96..27554c905 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -274,35 +274,6 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { return c.eventHandler.OnRow(events) } -func (c *Canal) FlushBinlog() error { - _, err := c.Execute("FLUSH BINARY LOGS") - return errors.Trace(err) -} - -func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error { - timer := time.NewTimer(timeout) - for { - select { - case <-timer.C: - return errors.Errorf("wait position %v too long > %s", pos, timeout) - default: - err := c.FlushBinlog() - if err != nil { - return errors.Trace(err) - } - curPos := c.master.Position() - if curPos.Compare(pos) >= 0 { - return nil - } else { - log.Debugf("master pos is %v, wait catching %v", curPos, pos) - time.Sleep(100 * time.Millisecond) - } - } - } - - return nil -} - func (c *Canal) GetMasterPos() (mysql.Position, error) { rr, err := c.Execute("SHOW MASTER STATUS") if err != nil { @@ -337,12 +308,3 @@ func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error) { } return gset, nil } - -func (c *Canal) CatchMasterPos(timeout time.Duration) error { - pos, err := c.GetMasterPos() - if err != nil { - return errors.Trace(err) - } - - return c.WaitUntilPos(pos, timeout) -} From f81397d1405e66ee7205f9edb61d93cfc85ef025 Mon Sep 17 00:00:00 2001 From: "DESKTOP-E2OL3QR\\Eitam" Date: Sun, 16 Jan 2022 11:45:02 +0200 Subject: [PATCH 06/27] Fixed testing --- canal/canal_test.go | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/canal/canal_test.go b/canal/canal_test.go index a5bb6ed92..e1c8ad7e3 100644 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -141,10 +141,48 @@ func (s *canalTestSuite) TestCanal(c *C) { s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`") s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18") - err := s.c.CatchMasterPos(10 * time.Second) + err := CatchMasterPos(s.c, 10*time.Second) c.Assert(err, IsNil) } +func CatchMasterPos(c *Canal, timeout time.Duration) error { + pos, err := c.GetMasterPos() + if err != nil { + return errors.Trace(err) + } + + return WaitUntilPos(c, pos, timeout) +} + +func FlushBinlog(c *Canal) error { + _, err := c.Execute("FLUSH BINARY LOGS") + return errors.Trace(err) +} + +func WaitUntilPos(c *Canal, pos mysql.Position, timeout time.Duration) error { + timer := time.NewTimer(timeout) + for { + select { + case <-timer.C: + return errors.Errorf("wait position %v too long > %s", pos, timeout) + default: + err := FlushBinlog(c) + if err != nil { + return errors.Trace(err) + } + curPos := c.master.Position() + if curPos.Compare(pos) >= 0 { + return nil + } else { + log.Debugf("master pos is %v, wait catching %v", curPos, pos) + time.Sleep(100 * time.Millisecond) + } + } + } + + return nil +} + func (s *canalTestSuite) TestCanalFilter(c *C) { // included sch, err := s.c.GetTable("test", "canal_test") From 824d6508555f5f0c30acef0d13d8c590c34be261 Mon Sep 17 00:00:00 2001 From: "DESKTOP-E2OL3QR\\Eitam" Date: Sun, 30 Jan 2022 14:05:43 +0200 Subject: [PATCH 07/27] Added delete on table when deleted --- replication/binlogsyncer.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 1c58773ac..0ebda2e1a 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "os" + "strings" "sync" "time" @@ -725,6 +726,11 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { switch data[0] { case OK_HEADER: if err = b.parseEvent(b.nextPos.Name, s, data); err != nil { + // if the error is errMissingTableMapEvent skip + if strings.Contains(strings.ToLower(errors.Cause(err).Error()), errMissingTableMapEvent.Error()) { + log.Errorf("invalid table skipping , problay deleted? %s", err.Error()) + continue + } s.closeWithError(err) return } From 61a148b69f317e4eb55ca12a56cf782d0f70a213 Mon Sep 17 00:00:00 2001 From: "DESKTOP-E2OL3QR\\Eitam" Date: Sun, 30 Jan 2022 14:06:02 +0200 Subject: [PATCH 08/27] probably grammar --- replication/binlogsyncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 0ebda2e1a..e56875d15 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -728,7 +728,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { if err = b.parseEvent(b.nextPos.Name, s, data); err != nil { // if the error is errMissingTableMapEvent skip if strings.Contains(strings.ToLower(errors.Cause(err).Error()), errMissingTableMapEvent.Error()) { - log.Errorf("invalid table skipping , problay deleted? %s", err.Error()) + log.Errorf("invalid table skipping , probably deleted? %s", err.Error()) continue } s.closeWithError(err) From 05b87bb5301ad0edd93b067f57fd271757f13c17 Mon Sep 17 00:00:00 2001 From: Eitam Date: Wed, 16 Feb 2022 11:07:21 +0200 Subject: [PATCH 09/27] Removed spamming log --- replication/binlogsyncer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index e56875d15..37b1abe90 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -728,7 +728,6 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { if err = b.parseEvent(b.nextPos.Name, s, data); err != nil { // if the error is errMissingTableMapEvent skip if strings.Contains(strings.ToLower(errors.Cause(err).Error()), errMissingTableMapEvent.Error()) { - log.Errorf("invalid table skipping , probably deleted? %s", err.Error()) continue } s.closeWithError(err) From fa2e8fa1122c3f2400ab8f178bd9285a9ed50019 Mon Sep 17 00:00:00 2001 From: Eitam Date: Tue, 22 Feb 2022 19:11:30 +0200 Subject: [PATCH 10/27] Added covert to string interface if needed --- replication/row_event.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/replication/row_event.go b/replication/row_event.go index 1729aefb7..a6ffd7611 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1141,6 +1141,10 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ v, err = littleDecodeBit(data, nbits, n) case MYSQL_TYPE_BLOB: v, n, err = decodeBlob(data, meta) + newValue, ok := convertToString(v) + if ok { + v = newValue + } case MYSQL_TYPE_VARCHAR, MYSQL_TYPE_VAR_STRING: length = int(meta) @@ -1152,6 +1156,10 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ length = int(FixedLengthInt(data[0:meta])) n = length + int(meta) v, err = e.decodeJsonBinary(data[meta:n]) + newValue, ok := convertToString(v) + if ok { + v = newValue + } case MYSQL_TYPE_GEOMETRY: // MySQL saves Geometry as Blob in binlog // Seem that the binary format is SRID (4 bytes) + WKB, outer can use @@ -1167,6 +1175,20 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ return v, n, err } +// convertToString receive an interface and convert it to string if match the desired type +func convertToString(s interface{}) (string, bool) { + if s == nil { + return "", false + } + switch v := s.(type) { + case []uint8: + str := string(v) + return str, true + default: + return "", false + } +} + func decodeString(data []byte, length int) (v string, n int) { if length < 256 { length = int(data[0]) From fdd11a693aefb45821491f155a92a08766b4ff8e Mon Sep 17 00:00:00 2001 From: Eitam Date: Wed, 23 Feb 2022 17:57:39 +0200 Subject: [PATCH 11/27] Remove unneeded log on SP --- canal/sync.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/canal/sync.go b/canal/sync.go index 27554c905..29226646d 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -2,6 +2,7 @@ package canal import ( "fmt" + "strings" "sync/atomic" "time" @@ -142,7 +143,15 @@ func (c *Canal) runSyncBinlog() error { case *replication.QueryEvent: stmts, _, err := c.parser.Parse(string(e.Query), "", "") if err != nil { - log.Errorf("parse query(%s) err %v, will skip this event", e.Query, err) + msg := err.Error() + if strings.Contains(strings.ToLower(msg), strings.ToLower("procedure")) { + // Cut the first row from the message since it contain the procedure call and not the entire message + fl := strings.Split(msg, "\n") + log.Errorf("parse SP Error: (%s)", fl[0]) + } else { + log.Errorf("parse query(%s) err %v", e.Query, err) + } + log.Error("will skip this event") continue } for _, stmt := range stmts { From 73477d3615c9ceb9d8771481a5fab58ae2f48d03 Mon Sep 17 00:00:00 2001 From: Eitam Date: Thu, 31 Mar 2022 17:58:28 +0300 Subject: [PATCH 12/27] Change mysql_date default zero based value to nil --- replication/row_event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication/row_event.go b/replication/row_event.go index a6ffd7611..259431538 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1109,7 +1109,7 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ n = 3 i32 := uint32(FixedLengthInt(data[0:3])) if i32 == 0 { - v = "0000-00-00" + v = nil } else { v = fmt.Sprintf("%04d-%02d-%02d", i32/(16*32), i32/32%16, i32%32) } From 007d4192821f915712ad6cd6684ed6d2f00f0a5b Mon Sep 17 00:00:00 2001 From: Eitam Date: Thu, 7 Apr 2022 10:38:07 +0300 Subject: [PATCH 13/27] Change mysql_MYSQL_TYPE_TIMESTAMP2 and MYSQL_TYPE_TIMESTAMP to be default nil --- replication/row_event.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 259431538..60e91cae6 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1059,7 +1059,7 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ n = 4 t := binary.LittleEndian.Uint32(data) if t == 0 { - v = formatZeroTime(0, 0) + v = nil } else { v = e.parseFracTime(fracTime{ Time: time.Unix(int64(t), 0), @@ -1068,13 +1068,17 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ }) } case MYSQL_TYPE_TIMESTAMP2: - v, n, err = decodeTimestamp2(data, meta, e.timestampStringLocation) - v = e.parseFracTime(v) + if meta == 0 { + v = nil + } else { + v, n, err = decodeTimestamp2(data, meta, e.timestampStringLocation) + v = e.parseFracTime(v) + } case MYSQL_TYPE_DATETIME: n = 8 i64 := binary.LittleEndian.Uint64(data) if i64 == 0 { - v = formatZeroTime(0, 0) + v = nil } else { d := i64 / 1000000 t := i64 % 1000000 From d15a7cee3d7946d9437bf0510e972d5b063ab040 Mon Sep 17 00:00:00 2001 From: Eitam Date: Thu, 7 Apr 2022 11:29:29 +0300 Subject: [PATCH 14/27] Change decodeDatetime2 default to zero --- replication/row_event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication/row_event.go b/replication/row_event.go index 60e91cae6..9b3ec4700 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1428,7 +1428,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) { } if intPart == 0 { - return formatZeroTime(int(frac), int(dec)), n, nil + return nil, n, nil } tmp := intPart<<24 + frac From 1f7d701c2794b814c8abbe48810c7974573b286c Mon Sep 17 00:00:00 2001 From: Eitam Date: Thu, 7 Apr 2022 13:04:16 +0300 Subject: [PATCH 15/27] Change decodeTimestamp2 to return nil in case sec = 0 --- replication/row_event.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 9b3ec4700..3b64f6620 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1068,12 +1068,8 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ }) } case MYSQL_TYPE_TIMESTAMP2: - if meta == 0 { - v = nil - } else { - v, n, err = decodeTimestamp2(data, meta, e.timestampStringLocation) - v = e.parseFracTime(v) - } + v, n, err = decodeTimestamp2(data, meta, e.timestampStringLocation) + v = e.parseFracTime(v) case MYSQL_TYPE_DATETIME: n = 8 i64 := binary.LittleEndian.Uint64(data) @@ -1399,7 +1395,7 @@ func decodeTimestamp2(data []byte, dec uint16, timestampStringLocation *time.Loc } if sec == 0 { - return formatZeroTime(int(usec), int(dec)), n, nil + return nil, n, nil } return fracTime{ From 411b75d5bb6da391c2a589fb8ac1e47709e8d968 Mon Sep 17 00:00:00 2001 From: Eitam Date: Thu, 14 Apr 2022 16:35:56 +0300 Subject: [PATCH 16/27] Add limit to max host name length --- replication/binlogsyncer.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 37b1abe90..7e27fe4c9 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -19,7 +19,8 @@ import ( ) var ( - errSyncRunning = errors.New("Sync is running, must Close first") + errSyncRunning = errors.New("Sync is running, must Close first") + maxHostNameLength = 60 ) // BinlogSyncerConfig is the configuration for BinlogSyncer. @@ -524,7 +525,10 @@ func (b *BinlogSyncer) writeRegisterSlaveCommand() error { b.c.ResetSequence() hostname := b.localHostname() - + if len(hostname) > maxHostNameLength { + runes := []rune(hostname) + hostname = string(runes[0:maxHostNameLength]) + } // This should be the name of slave host not the host we are connecting to. data := make([]byte, 4+1+4+1+len(hostname)+1+len(b.cfg.User)+1+len(b.cfg.Password)+2+4+4) pos := 4 From c349655f92e62c8d54c0be03eb6d123b8f7e23a5 Mon Sep 17 00:00:00 2001 From: eitamos Date: Tue, 17 Jan 2023 11:01:09 +0200 Subject: [PATCH 17/27] Change log debug level --- canal/sync.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index 29226646d..231baa6eb 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -147,11 +147,11 @@ func (c *Canal) runSyncBinlog() error { if strings.Contains(strings.ToLower(msg), strings.ToLower("procedure")) { // Cut the first row from the message since it contain the procedure call and not the entire message fl := strings.Split(msg, "\n") - log.Errorf("parse SP Error: (%s)", fl[0]) + log.Debugf("parse SP Error: (%s)", fl[0]) } else { - log.Errorf("parse query(%s) err %v", e.Query, err) + log.Debugf("parse query(%s) err %v", e.Query, err) } - log.Error("will skip this event") + log.Debugln("will skip this event") continue } for _, stmt := range stmts { From c3593a75dbd9b0e723c24e66c0599bebf74971c1 Mon Sep 17 00:00:00 2001 From: Sigalit Kanevsky Date: Sun, 27 Apr 2025 11:08:31 +0300 Subject: [PATCH 18/27] add latin support --- replication/row_event.go | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 3b64f6620..3c3c56a4c 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "encoding/hex" "fmt" + "golang.org/x/text/encoding/charmap" + "golang.org/x/text/transform" "io" "strconv" "strings" @@ -1148,9 +1150,9 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ case MYSQL_TYPE_VARCHAR, MYSQL_TYPE_VAR_STRING: length = int(meta) - v, n = decodeString(data, length) + v, n = decodeStringLatin1(data, length) case MYSQL_TYPE_STRING: - v, n = decodeString(data, length) + v, n = decodeStringLatin1(data, length) case MYSQL_TYPE_JSON: // Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404 length = int(FixedLengthInt(data[0:meta])) @@ -1204,6 +1206,29 @@ func decodeString(data []byte, length int) (v string, n int) { return } +func decodeStringLatin1(data []byte, length int) (v string, n int) { + // Define the Latin1 decoder + decoder := charmap.ISO8859_1.NewDecoder() + + if length < 256 { + // If the length is smaller than 256, extract the length from the first byte + length = int(data[0]) + n = length + 1 + // Use the decoder to convert to a string with Latin1 encoding + decodedBytes, _, _ := transform.Bytes(decoder, data[1:n]) + v = string(decodedBytes) + } else { + // If the length is larger, extract it using LittleEndian + length = int(binary.LittleEndian.Uint16(data[0:])) + n = length + 2 + // Use the decoder to convert to a string with Latin1 encoding + decodedBytes, _, _ := transform.Bytes(decoder, data[2:n]) + v = string(decodedBytes) + } + + return +} + // ref: https://github.com/mysql/mysql-server/blob/a9b0c712de3509d8d08d3ba385d41a4df6348775/strings/decimal.c#L137 const digitsPerInteger int = 9 From fd2c5822175b3b9e7ce012f9b7363329e01d9807 Mon Sep 17 00:00:00 2001 From: Sigalit Kanevsky Date: Sun, 27 Apr 2025 11:38:26 +0300 Subject: [PATCH 19/27] add latin support --- replication/row_event.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 3c3c56a4c..ad5108d64 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "time" + "unicode/utf8" "github.com/pingcap/errors" "github.com/shopspring/decimal" @@ -1150,9 +1151,17 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ case MYSQL_TYPE_VARCHAR, MYSQL_TYPE_VAR_STRING: length = int(meta) - v, n = decodeStringLatin1(data, length) + if utf8.Valid(data) { + v, n = decodeString(data, length) + } else { + v, n = decodeStringLatin1(data, length) + } case MYSQL_TYPE_STRING: - v, n = decodeStringLatin1(data, length) + if utf8.Valid(data) { + v, n = decodeString(data, length) + } else { + v, n = decodeStringLatin1(data, length) + } case MYSQL_TYPE_JSON: // Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404 length = int(FixedLengthInt(data[0:meta])) From 3d213aec9e7ca0017958ce4bc94b1fcc7f207d3a Mon Sep 17 00:00:00 2001 From: Sigalit Kanevsky Date: Sun, 27 Apr 2025 12:43:20 +0300 Subject: [PATCH 20/27] add latin support --- replication/row_event.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index ad5108d64..0e18a460a 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1151,16 +1151,16 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ case MYSQL_TYPE_VARCHAR, MYSQL_TYPE_VAR_STRING: length = int(meta) - if utf8.Valid(data) { - v, n = decodeString(data, length) - } else { + if !utf8.Valid(data) && isLatin1(data) { v, n = decodeStringLatin1(data, length) + } else { + v, n = decodeString(data, length) } case MYSQL_TYPE_STRING: - if utf8.Valid(data) { - v, n = decodeString(data, length) - } else { + if !utf8.Valid(data) && isLatin1(data) { v, n = decodeStringLatin1(data, length) + } else { + v, n = decodeString(data, length) } case MYSQL_TYPE_JSON: // Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404 @@ -1238,6 +1238,14 @@ func decodeStringLatin1(data []byte, length int) (v string, n int) { return } +func isLatin1(data []byte) bool { + decoder := charmap.ISO8859_1.NewDecoder() + decodedBytes, _, _ := transform.Bytes(decoder, data) + + // If the decoded result matches the original bytes, it suggests it's Latin1 + return string(decodedBytes) == string(data) +} + // ref: https://github.com/mysql/mysql-server/blob/a9b0c712de3509d8d08d3ba385d41a4df6348775/strings/decimal.c#L137 const digitsPerInteger int = 9 From b34a98d3de5c83fac33df003a1a3ce5431a313af Mon Sep 17 00:00:00 2001 From: Sigalit Kanevsky Date: Sun, 27 Apr 2025 13:01:42 +0300 Subject: [PATCH 21/27] add latin support --- replication/row_event.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/replication/row_event.go b/replication/row_event.go index 0e18a460a..df02b7ac0 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1241,9 +1241,10 @@ func decodeStringLatin1(data []byte, length int) (v string, n int) { func isLatin1(data []byte) bool { decoder := charmap.ISO8859_1.NewDecoder() decodedBytes, _, _ := transform.Bytes(decoder, data) + encodedBackToLatin1, _, _ := transform.Bytes(decoder, decodedBytes) // If the decoded result matches the original bytes, it suggests it's Latin1 - return string(decodedBytes) == string(data) + return string(encodedBackToLatin1) == string(data) } // ref: https://github.com/mysql/mysql-server/blob/a9b0c712de3509d8d08d3ba385d41a4df6348775/strings/decimal.c#L137 From 7222b87fe4da7b1b2068c3fbf95ba525408e1c78 Mon Sep 17 00:00:00 2001 From: Sigalit Kanevsky Date: Sun, 27 Apr 2025 14:53:32 +0300 Subject: [PATCH 22/27] add latin support --- replication/row_event.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index df02b7ac0..aa9e20876 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1151,13 +1151,15 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ case MYSQL_TYPE_VARCHAR, MYSQL_TYPE_VAR_STRING: length = int(meta) - if !utf8.Valid(data) && isLatin1(data) { + fmt.Printf("Column charset is: %v", e.Table.ColumnCharset) + if !utf8.Valid(data) && len(e.Table.ColumnCharset) > 0 { + v, n = decodeStringLatin1(data, length) } else { v, n = decodeString(data, length) } case MYSQL_TYPE_STRING: - if !utf8.Valid(data) && isLatin1(data) { + if !utf8.Valid(data) && len(e.Table.ColumnCharset) > 0 { v, n = decodeStringLatin1(data, length) } else { v, n = decodeString(data, length) @@ -1238,15 +1240,6 @@ func decodeStringLatin1(data []byte, length int) (v string, n int) { return } -func isLatin1(data []byte) bool { - decoder := charmap.ISO8859_1.NewDecoder() - decodedBytes, _, _ := transform.Bytes(decoder, data) - encodedBackToLatin1, _, _ := transform.Bytes(decoder, decodedBytes) - - // If the decoded result matches the original bytes, it suggests it's Latin1 - return string(encodedBackToLatin1) == string(data) -} - // ref: https://github.com/mysql/mysql-server/blob/a9b0c712de3509d8d08d3ba385d41a4df6348775/strings/decimal.c#L137 const digitsPerInteger int = 9 From b4851e04f18caa0795833f954826089e1aa1072b Mon Sep 17 00:00:00 2001 From: Sigalit Kanevsky Date: Sun, 27 Apr 2025 16:11:32 +0300 Subject: [PATCH 23/27] add latin support --- replication/binlogsyncer.go | 1 + replication/parser.go | 10 +++++++++- replication/row_event.go | 8 ++++---- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 7e27fe4c9..384246591 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -163,6 +163,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation) b.parser.SetUseDecimal(b.cfg.UseDecimal) b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum) + b.parser.SetIsLatin(b.c.GetCharset() == "latin1") b.running = false b.ctx, b.cancel = context.WithCancel(context.Background()) diff --git a/replication/parser.go b/replication/parser.go index 21bcdb86b..4f4ca3ed8 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -22,7 +22,8 @@ var ( type BinlogParser struct { // "mysql" or "mariadb", if not set, use "mysql" by default - flavor string + flavor string + isLatin bool format *FormatDescriptionEvent @@ -212,6 +213,10 @@ func (p *BinlogParser) SetFlavor(flavor string) { p.flavor = flavor } +func (p *BinlogParser) SetIsLatin(isLatin bool) { + p.isLatin = isLatin +} + func (p *BinlogParser) parseHeader(fileName string, data []byte) (*EventHeader, error) { h := new(EventHeader) err := h.Decode(data) @@ -303,6 +308,9 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( } if te, ok := e.(*TableMapEvent); ok { + if p.isLatin { + te.isLatin = true + } p.tables[te.TableID] = te } diff --git a/replication/row_event.go b/replication/row_event.go index aa9e20876..f1cd2a425 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -25,6 +25,7 @@ var errMissingTableMapEvent = errors.New("invalid table id, no corresponding tab type TableMapEvent struct { flavor string tableIDSize int + isLatin bool TableID uint64 @@ -1151,15 +1152,14 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ case MYSQL_TYPE_VARCHAR, MYSQL_TYPE_VAR_STRING: length = int(meta) - fmt.Printf("Column charset is: %v", e.Table.ColumnCharset) - if !utf8.Valid(data) && len(e.Table.ColumnCharset) > 0 { - + fmt.Printf("islatin is: %v", e.Table.isLatin) + if !utf8.Valid(data) && e.Table.isLatin { v, n = decodeStringLatin1(data, length) } else { v, n = decodeString(data, length) } case MYSQL_TYPE_STRING: - if !utf8.Valid(data) && len(e.Table.ColumnCharset) > 0 { + if !utf8.Valid(data) && e.Table.isLatin { v, n = decodeStringLatin1(data, length) } else { v, n = decodeString(data, length) From b939f182292b2d72699a5bfeb1b7f9147a7201ac Mon Sep 17 00:00:00 2001 From: Sigalit Kanevsky Date: Sun, 27 Apr 2025 16:16:55 +0300 Subject: [PATCH 24/27] move to function --- replication/binlogsyncer.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 384246591..e6c579b1c 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -163,13 +163,20 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation) b.parser.SetUseDecimal(b.cfg.UseDecimal) b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum) - b.parser.SetIsLatin(b.c.GetCharset() == "latin1") + b.parser.SetIsLatin(b.getIsCharsetLatin1()) b.running = false b.ctx, b.cancel = context.WithCancel(context.Background()) return b } +func (b *BinlogSyncer) getIsCharsetLatin1() bool { + if b.c != nil { + return b.c.GetCharset() == "latin1" + } + return false +} + // Close closes the BinlogSyncer. func (b *BinlogSyncer) Close() { b.m.Lock() From 88f0609a2156548f5387f08b740d77e6ed57cead Mon Sep 17 00:00:00 2001 From: Sigalit Kanevsky Date: Sun, 27 Apr 2025 16:42:12 +0300 Subject: [PATCH 25/27] move to function --- replication/binlogsyncer.go | 5 +---- replication/parser.go | 4 +--- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index e6c579b1c..08f9cd79c 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -171,10 +171,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { } func (b *BinlogSyncer) getIsCharsetLatin1() bool { - if b.c != nil { - return b.c.GetCharset() == "latin1" - } - return false + return b.cfg.Charset == "latin1" } // Close closes the BinlogSyncer. diff --git a/replication/parser.go b/replication/parser.go index 4f4ca3ed8..966884a99 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -308,9 +308,7 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( } if te, ok := e.(*TableMapEvent); ok { - if p.isLatin { - te.isLatin = true - } + te.isLatin = p.isLatin p.tables[te.TableID] = te } From 0799a57003e11709593d2b8ab9dbe1dc669f0cd4 Mon Sep 17 00:00:00 2001 From: Sigalit Kanevsky Date: Mon, 28 Apr 2025 11:50:38 +0300 Subject: [PATCH 26/27] fixes and tests --- replication/binlogsyncer.go | 6 +- replication/parser.go | 8 +-- replication/row_event.go | 23 ++++---- replication/row_event_test.go | 100 ++++++++++++++++++++++++++++++++++ 4 files changed, 116 insertions(+), 21 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 08f9cd79c..6521d11aa 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -163,17 +163,13 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation) b.parser.SetUseDecimal(b.cfg.UseDecimal) b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum) - b.parser.SetIsLatin(b.getIsCharsetLatin1()) + b.parser.SetCharset(b.cfg.Charset) b.running = false b.ctx, b.cancel = context.WithCancel(context.Background()) return b } -func (b *BinlogSyncer) getIsCharsetLatin1() bool { - return b.cfg.Charset == "latin1" -} - // Close closes the BinlogSyncer. func (b *BinlogSyncer) Close() { b.m.Lock() diff --git a/replication/parser.go b/replication/parser.go index 966884a99..cee6b060c 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -23,7 +23,7 @@ var ( type BinlogParser struct { // "mysql" or "mariadb", if not set, use "mysql" by default flavor string - isLatin bool + charset string format *FormatDescriptionEvent @@ -213,8 +213,8 @@ func (p *BinlogParser) SetFlavor(flavor string) { p.flavor = flavor } -func (p *BinlogParser) SetIsLatin(isLatin bool) { - p.isLatin = isLatin +func (p *BinlogParser) SetCharset(charset string) { + p.charset = charset } func (p *BinlogParser) parseHeader(fileName string, data []byte) (*EventHeader, error) { @@ -308,7 +308,7 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( } if te, ok := e.(*TableMapEvent); ok { - te.isLatin = p.isLatin + te.charset = p.charset p.tables[te.TableID] = te } diff --git a/replication/row_event.go b/replication/row_event.go index f1cd2a425..d18550390 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -25,7 +25,7 @@ var errMissingTableMapEvent = errors.New("invalid table id, no corresponding tab type TableMapEvent struct { flavor string tableIDSize int - isLatin bool + charset string TableID uint64 @@ -1152,18 +1152,9 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ case MYSQL_TYPE_VARCHAR, MYSQL_TYPE_VAR_STRING: length = int(meta) - fmt.Printf("islatin is: %v", e.Table.isLatin) - if !utf8.Valid(data) && e.Table.isLatin { - v, n = decodeStringLatin1(data, length) - } else { - v, n = decodeString(data, length) - } + v, n = decodeByCharSet(data, e.Table.charset, length) case MYSQL_TYPE_STRING: - if !utf8.Valid(data) && e.Table.isLatin { - v, n = decodeStringLatin1(data, length) - } else { - v, n = decodeString(data, length) - } + v, n = decodeByCharSet(data, e.Table.charset, length) case MYSQL_TYPE_JSON: // Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404 length = int(FixedLengthInt(data[0:meta])) @@ -1202,6 +1193,14 @@ func convertToString(s interface{}) (string, bool) { } } +func decodeByCharSet(data []byte, charset string, length int) (v string, n int) { + if !utf8.Valid(data) && charset == "latin1" { + return decodeStringLatin1(data, length) + } else { + return decodeString(data, length) + } +} + func decodeString(data []byte, length int) (v string, n int) { if length < 256 { length = int(data[0]) diff --git a/replication/row_event_test.go b/replication/row_event_test.go index d81277ff6..99907e7be 100644 --- a/replication/row_event_test.go +++ b/replication/row_event_test.go @@ -1,7 +1,10 @@ package replication import ( + "bytes" + "encoding/binary" "fmt" + "testing" . "github.com/pingcap/check" "github.com/shopspring/decimal" @@ -1341,3 +1344,100 @@ func (_ *testDecodeSuite) BenchmarkInt(c *C) { } } } + +func TestDecodeStringLatin1(t *testing.T) { + tests := []struct { + name string + input []byte + length int + wantStr string + wantRead int + }{ + { + name: "Short Latin1 string", + input: append([]byte{5}, []byte{0xe2, 'f', 'g', 'h', 0xe9}...), // length byte + 5 bytes: 'âfghé' + length: 5, + wantStr: "âfghé", + wantRead: 6, + }, + { + name: "Empty string", + input: []byte{0}, + length: 0, + wantStr: "", + wantRead: 1, + }, + { + name: "Long string (>255, 2-byte length)", + input: func() []byte { + buf := new(bytes.Buffer) + binary.Write(buf, binary.LittleEndian, uint16(5)) + buf.Write([]byte{0xe2, 'f', 'g', 'h', 0xe9}) // 'âfghé' + return buf.Bytes() + }(), + length: 300, + wantStr: "âfghé", + wantRead: 7, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotStr, gotRead := decodeStringLatin1(tt.input, tt.length) + if gotStr != tt.wantStr { + t.Errorf("decodeStringLatin1() got string = %q, want %q", gotStr, tt.wantStr) + } + if gotRead != tt.wantRead { + t.Errorf("decodeStringLatin1() got read bytes = %d, want %d", gotRead, tt.wantRead) + } + }) + } +} + +func TestDecodeByCharSet(t *testing.T) { + tests := []struct { + name string + input []byte + charset string + length int + wantStr string + }{ + { + name: "Valid UTF-8 with UTF-8 charset", + input: append([]byte{11}, []byte("hello world")...), + charset: "utf8", + length: 11, + wantStr: "hello world", + }, + { + name: "Invalid UTF-8 but charset is latin1", + input: append([]byte{3}, []byte{0xe2, 0x28, 0xa1}...), // invalid utf8 + charset: "latin1", + length: 3, + wantStr: "â(¡", + }, + { + name: "Invalid UTF-8 but charset is utf8", + input: append([]byte{3}, []byte{0xe2, 0x28, 0xa1}...), // invalid utf8 + charset: "utf8", + length: 3, + wantStr: "\xe2(\xa1", + }, + { + name: "Valid UTF-8 and charset latin1", + input: append([]byte{11}, []byte("hello world")...), + charset: "latin1", + length: 11, + wantStr: "hello world", // because utf8.Valid is true, decodeString should be used + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotStr, _ := decodeByCharSet(tt.input, tt.charset, tt.length) + if gotStr != tt.wantStr { + t.Errorf("decodeByCharSet() = %q, want %q", gotStr, tt.wantStr) + } + }) + } +} From a2053cd93ae92940c93995f4024ee64104c8797e Mon Sep 17 00:00:00 2001 From: Sigalit Kanevsky Date: Mon, 28 Apr 2025 12:26:51 +0300 Subject: [PATCH 27/27] fixes --- replication/row_event.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index d18550390..85c61c834 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1194,11 +1194,14 @@ func convertToString(s interface{}) (string, bool) { } func decodeByCharSet(data []byte, charset string, length int) (v string, n int) { - if !utf8.Valid(data) && charset == "latin1" { - return decodeStringLatin1(data, length) - } else { - return decodeString(data, length) + if !utf8.Valid(data) { + if charset == "latin1" { + return decodeStringLatin1(data, length) + } else { + fmt.Printf("Data is not in utf8 or latin1") + } } + return decodeString(data, length) } func decodeString(data []byte, length int) (v string, n int) {