Skip to content

feat: Charset parameter for Mysql connection #1027

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ef55c4c
Added waitTimeBetweenConnection check
eitamring Jan 6, 2022
6855741
Added safe conn close
eitamring Jan 6, 2022
e6565cd
Added file name to each event
eitamring Jan 6, 2022
cb4f084
Added gtid to each event
eitamring Jan 6, 2022
8d7d13c
Removed FlushBinlog which was only used for testing
eitamring Jan 16, 2022
f81397d
Fixed testing
eitamring Jan 16, 2022
f77308e
Merge pull request #1 from RiveryIO/feature/eitam/removed_flush_for_t…
eitamring Jan 19, 2022
824d650
Added delete on table when deleted
eitamring Jan 30, 2022
61a148b
probably grammar
eitamring Jan 30, 2022
8cc642f
Merge pull request #2 from RiveryIO/fix/eitam/add_skip_when_table_is_…
eitamring Jan 30, 2022
05b87bb
Removed spamming log
eitamring Feb 16, 2022
2025086
Merge pull request #3 from RiveryIO/feature/eitam/removed_spaming_log
eitamring Feb 16, 2022
fa2e8fa
Added covert to string interface if needed
eitamring Feb 22, 2022
e95e5cd
Merge pull request #4 from RiveryIO/feature/eitam/support_base64_byte…
eitamring Feb 22, 2022
fdd11a6
Remove unneeded log on SP
eitamring Feb 23, 2022
d947c9e
Merge pull request #5 from RiveryIO/feautre/eitam/remove_uneeded_log_…
eitamring Feb 23, 2022
73477d3
Change mysql_date default zero based value to nil
eitamring Mar 31, 2022
eb2ea8e
Merge pull request #6 from RiveryIO/feature/eitam/add_nil_based_value…
eitamring Mar 31, 2022
007d419
Change mysql_MYSQL_TYPE_TIMESTAMP2 and MYSQL_TYPE_TIMESTAMP to be def…
eitamring Apr 7, 2022
c6c9248
Merge pull request #7 from RiveryIO/feature/eitam/add_nil_based_value…
eitamring Apr 7, 2022
d15a7ce
Change decodeDatetime2 default to zero
eitamring Apr 7, 2022
4298599
Merge pull request #8 from RiveryIO/feature/eitam/add_nil_based_value…
eitamring Apr 7, 2022
1f7d701
Change decodeTimestamp2 to return nil in case sec = 0
eitamring Apr 7, 2022
98108dd
Merge pull request #9 from RiveryIO/fix/eitam/decode_timestamp
eitamring Apr 7, 2022
411b75d
Add limit to max host name length
eitamring Apr 14, 2022
9bd5965
Merge pull request #10 from RiveryIO/feature/eitam/limit_max_host_name
eitamring Apr 14, 2022
c349655
Change log debug level
eitamring Jan 17, 2023
e7d51c3
Merge pull request #12 from RiveryIO/feature/eitam/change_log_debug_s…
eitamring Jan 17, 2023
c3593a7
add latin support
sigalikanevsky Apr 27, 2025
fd2c582
add latin support
sigalikanevsky Apr 27, 2025
3d213ae
add latin support
sigalikanevsky Apr 27, 2025
b34a98d
add latin support
sigalikanevsky Apr 27, 2025
7222b87
add latin support
sigalikanevsky Apr 27, 2025
b4851e0
add latin support
sigalikanevsky Apr 27, 2025
b939f18
move to function
sigalikanevsky Apr 27, 2025
88f0609
move to function
sigalikanevsky Apr 27, 2025
0799a57
fixes and tests
sigalikanevsky Apr 28, 2025
a2053cd
fixes
sigalikanevsky Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type MyEventHandler struct {
}

func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
log.Infof("%s %v\n", e.Action, e.Rows)
log.Debugf("%s %v\n", e.Action, e.Rows)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the log level should be kept, other developers may rely on this behaviour.
Also for other files.

return nil
}

Expand Down
42 changes: 25 additions & 17 deletions canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func NewCanal(cfg *Config) (*Canal, error) {

c.ctx, c.cancel = context.WithCancel(context.Background())

if cfg.WaitTimeBetweenConnectionSeconds > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be <= 0 to set the default value?

cfg.WaitTimeBetweenConnectionSeconds = time.Duration(5) * time.Second
}

c.dumpDoneCh = make(chan struct{})
c.eventHandler = &DummyEventHandler{}
c.parser = parser.New()
Expand Down Expand Up @@ -192,6 +196,7 @@ func (c *Canal) RunFrom(pos mysql.Position) error {
return c.Run()
}

// Start from selected GTIDSet
func (c *Canal) StartFromGTID(set mysql.GTIDSet) error {
c.master.UpdateGTIDSet(set)

Expand Down Expand Up @@ -238,15 +243,17 @@ func (c *Canal) run() error {
}

func (c *Canal) Close() {
log.Infof("closing canal")
log.Debugf("closing canal")
c.m.Lock()
defer c.m.Unlock()

c.cancel()
c.syncer.Close()
c.connLock.Lock()
c.conn.Close()
c.conn = nil
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.connLock.Unlock()

_ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true)
Expand Down Expand Up @@ -413,20 +420,21 @@ func (c *Canal) checkBinlogRowFormat() error {

func (c *Canal) prepareSyncer() error {
cfg := replication.BinlogSyncerConfig{
ServerID: c.cfg.ServerID,
Flavor: c.cfg.Flavor,
User: c.cfg.User,
Password: c.cfg.Password,
Charset: c.cfg.Charset,
HeartbeatPeriod: c.cfg.HeartbeatPeriod,
ReadTimeout: c.cfg.ReadTimeout,
UseDecimal: c.cfg.UseDecimal,
ParseTime: c.cfg.ParseTime,
SemiSyncEnabled: c.cfg.SemiSyncEnabled,
MaxReconnectAttempts: c.cfg.MaxReconnectAttempts,
DisableRetrySync: c.cfg.DisableRetrySync,
TimestampStringLocation: c.cfg.TimestampStringLocation,
TLSConfig: c.cfg.TLSConfig,
ServerID: c.cfg.ServerID,
Flavor: c.cfg.Flavor,
User: c.cfg.User,
Password: c.cfg.Password,
Charset: c.cfg.Charset,
HeartbeatPeriod: c.cfg.HeartbeatPeriod,
ReadTimeout: c.cfg.ReadTimeout,
UseDecimal: c.cfg.UseDecimal,
ParseTime: c.cfg.ParseTime,
SemiSyncEnabled: c.cfg.SemiSyncEnabled,
MaxReconnectAttempts: c.cfg.MaxReconnectAttempts,
DisableRetrySync: c.cfg.DisableRetrySync,
TimestampStringLocation: c.cfg.TimestampStringLocation,
TLSConfig: c.cfg.TLSConfig,
WaitTimeBetweenConnectionSeconds: c.cfg.WaitTimeBetweenConnectionSeconds,
}

if strings.Contains(c.cfg.Addr, "/") {
Expand Down
42 changes: 40 additions & 2 deletions canal/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type testEventHandler struct {
}

func (h *testEventHandler) OnRow(e *RowsEvent) error {
log.Infof("OnRow %s %v\n", e.Action, e.Rows)
log.Debugf("OnRow %s %v\n", e.Action, e.Rows)
umi, ok := e.Rows[0][4].(uint32) // 4th col is umi. mysqldump gives uint64 instead of uint32
if ok && (umi != umiA && umi != umiB && umi != umiC) {
return fmt.Errorf("invalid unsigned medium int %d", umi)
Expand Down Expand Up @@ -141,10 +141,48 @@ func (s *canalTestSuite) TestCanal(c *C) {
s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`")
s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18")

err := s.c.CatchMasterPos(10 * time.Second)
err := CatchMasterPos(s.c, 10*time.Second)
c.Assert(err, IsNil)
}

func CatchMasterPos(c *Canal, timeout time.Duration) error {
pos, err := c.GetMasterPos()
if err != nil {
return errors.Trace(err)
}

return WaitUntilPos(c, pos, timeout)
}

func FlushBinlog(c *Canal) error {
_, err := c.Execute("FLUSH BINARY LOGS")
return errors.Trace(err)
}

func WaitUntilPos(c *Canal, pos mysql.Position, timeout time.Duration) error {
timer := time.NewTimer(timeout)
for {
select {
case <-timer.C:
return errors.Errorf("wait position %v too long > %s", pos, timeout)
default:
err := FlushBinlog(c)
if err != nil {
return errors.Trace(err)
}
curPos := c.master.Position()
if curPos.Compare(pos) >= 0 {
return nil
} else {
log.Debugf("master pos is %v, wait catching %v", curPos, pos)
time.Sleep(100 * time.Millisecond)
}
}
}

return nil
}

func (s *canalTestSuite) TestCanalFilter(c *C) {
// included
sch, err := s.c.GetTable("test", "canal_test")
Expand Down
3 changes: 3 additions & 0 deletions canal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type Config struct {
// this configuration will not work if DisableRetrySync is true
MaxReconnectAttempts int `toml:"max_reconnect_attempts"`

// WaitTimeBetweenConnectionSeconds is the time to wait before retrying to connect.
WaitTimeBetweenConnectionSeconds time.Duration `toml:"max_wait_time_between_connection_seconds"`

// whether disable re-sync for broken connection
DisableRetrySync bool `toml:"disable_retry_sync"`

Expand Down
10 changes: 5 additions & 5 deletions canal/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ func (c *Canal) dump() error {
if err != nil {
return errors.Trace(err)
}
log.Infof("skip master data, get current binlog position %v", pos)
log.Debugf("skip master data, get current binlog position %v", pos)
h.name = pos.Name
h.pos = uint64(pos.Pos)
}

start := time.Now()
log.Info("try dump MySQL and parse")
log.Debug("try dump MySQL and parse")
if err := c.dumper.DumpAndParse(h); err != nil {
return errors.Trace(err)
}
Expand All @@ -185,7 +185,7 @@ func (c *Canal) dump() error {
c.master.UpdateGTIDSet(h.gset)
startPos = h.gset
}
log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s",
log.Debugf("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s",
time.Now().Sub(start).Seconds(), startPos)
return nil
}
Expand All @@ -196,12 +196,12 @@ func (c *Canal) tryDump() error {
if (len(pos.Name) > 0 && pos.Pos > 0) ||
(gset != nil && gset.String() != "") {
// we will sync with binlog name and position
log.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset)
log.Debugf("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset)
return nil
}

if c.dumper == nil {
log.Info("skip dump, no mysqldump")
log.Debug("skip dump, no mysqldump")
return nil
}

Expand Down
60 changes: 16 additions & 44 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package canal

import (
"fmt"
"strings"
"sync/atomic"
"time"

Expand All @@ -22,15 +23,15 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
if err != nil {
return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err)
}
log.Infof("start sync binlog at binlog file %v", pos)
log.Debugf("start sync binlog at binlog file %v", pos)
return s, nil
} else {
gsetClone := gset.Clone()
s, err := c.syncer.StartSyncGTID(gset)
if err != nil {
return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err)
}
log.Infof("start sync binlog at GTID set %v", gsetClone)
log.Debugf("start sync binlog at GTID set %v", gsetClone)
return s, nil
}
}
Expand Down Expand Up @@ -65,7 +66,7 @@ func (c *Canal) runSyncBinlog() error {
switch e := ev.Event.(type) {
case *replication.RotateEvent:
fakeRotateLogName = string(e.NextLogName)
log.Infof("received fake rotate event, next log name is %s", e.NextLogName)
log.Debugf("received fake rotate event, next log name is %s", e.NextLogName)
}

continue
Expand All @@ -92,7 +93,7 @@ func (c *Canal) runSyncBinlog() error {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
log.Infof("rotate binlog to %s", pos)
log.Debugf("rotate binlog to %s", pos)
savePos = true
force = true
if err = c.eventHandler.OnRotate(e); err != nil {
Expand Down Expand Up @@ -142,7 +143,15 @@ func (c *Canal) runSyncBinlog() error {
case *replication.QueryEvent:
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
if err != nil {
log.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
msg := err.Error()
if strings.Contains(strings.ToLower(msg), strings.ToLower("procedure")) {
// Cut the first row from the message since it contain the procedure call and not the entire message
fl := strings.Split(msg, "\n")
log.Debugf("parse SP Error: (%s)", fl[0])
} else {
log.Debugf("parse query(%s) err %v", e.Query, err)
}
log.Debugln("will skip this event")
continue
}
for _, stmt := range stmts {
Expand Down Expand Up @@ -232,7 +241,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) {

func (c *Canal) updateTable(db, table string) (err error) {
c.ClearTableCache([]byte(db), []byte(table))
log.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
log.Debugf("table structure changed, clear table cache: %s.%s\n", db, table)
if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
return errors.Trace(err)
}
Expand Down Expand Up @@ -270,38 +279,10 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
return errors.Errorf("%s not supported now", e.Header.EventType)
}
events := newRowsEvent(t, action, ev.Rows, e.Header)
events.Header.Gtid = c.SyncedGTIDSet()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is changed?

return c.eventHandler.OnRow(events)
}

func (c *Canal) FlushBinlog() error {
_, err := c.Execute("FLUSH BINARY LOGS")
return errors.Trace(err)
}

func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
timer := time.NewTimer(timeout)
for {
select {
case <-timer.C:
return errors.Errorf("wait position %v too long > %s", pos, timeout)
default:
err := c.FlushBinlog()
if err != nil {
return errors.Trace(err)
}
curPos := c.master.Position()
if curPos.Compare(pos) >= 0 {
return nil
} else {
log.Debugf("master pos is %v, wait catching %v", curPos, pos)
time.Sleep(100 * time.Millisecond)
}
}
}

return nil
}

func (c *Canal) GetMasterPos() (mysql.Position, error) {
rr, err := c.Execute("SHOW MASTER STATUS")
if err != nil {
Expand Down Expand Up @@ -336,12 +317,3 @@ func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error) {
}
return gset, nil
}

func (c *Canal) CatchMasterPos(timeout time.Duration) error {
pos, err := c.GetMasterPos()
if err != nil {
return errors.Trace(err)
}

return c.WaitUntilPos(pos, timeout)
}
2 changes: 1 addition & 1 deletion dump/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (d *Dumper) Dump(w io.Writer) error {
}

args[passwordArgIndex] = "--password=******"
log.Infof("exec mysqldump with %v", args)
log.Debugf("exec mysqldump with %v", args)
args[passwordArgIndex] = passwordArg
cmd := exec.Command(d.ExecutionPath, args...)

Expand Down
Loading