Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用sqlparser替换正则进行queryevent的解析 #382

Merged
merged 19 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go

go:
- "1.11"
- "1.12"

addons:
apt:
Expand Down
99 changes: 0 additions & 99 deletions canal/canal_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package canal

import (
"bytes"
"flag"
"fmt"
"testing"
Expand Down Expand Up @@ -133,101 +132,3 @@ func (s *canalTestSuite) TestCanalFilter(c *C) {
c.Assert(errors.Cause(err), Equals, ErrExcludedTable)
c.Assert(sch, IsNil)
}

func TestCreateTableExp(t *testing.T) {
lintanghui marked this conversation as resolved.
Show resolved Hide resolved
cases := []string{
"CREATE TABLE `mydb.mytable` (`id` int(10)) ENGINE=InnoDB",
"CREATE TABLE `mytable` (`id` int(10)) ENGINE=InnoDB",
"CREATE TABLE IF NOT EXISTS `mytable` (`id` int(10)) ENGINE=InnoDB",
"CREATE TABLE IF NOT EXISTS mytable (`id` int(10)) ENGINE=InnoDB",
}
table := []byte("mytable")
db := []byte("mydb")
for _, s := range cases {
m := expCreateTable.FindSubmatch([]byte(s))
mLen := len(m)
if m == nil || !bytes.Equal(m[mLen-1], table) || (len(m[mLen-2]) > 0 && !bytes.Equal(m[mLen-2], db)) {
t.Fatalf("TestCreateTableExp: case %s failed\n", s)
}
}
}

func TestAlterTableExp(t *testing.T) {
cases := []string{
"ALTER TABLE `mydb`.`mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE `mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mydb.mytable ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mytable ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mydb.mytable ADD field2 DATE NULL AFTER `field1`;",
}

table := []byte("mytable")
db := []byte("mydb")
for _, s := range cases {
m := expAlterTable.FindSubmatch([]byte(s))
mLen := len(m)
if m == nil || !bytes.Equal(m[mLen-1], table) || (len(m[mLen-2]) > 0 && !bytes.Equal(m[mLen-2], db)) {
t.Fatalf("TestAlterTableExp: case %s failed\n", s)
}
}
}

func TestRenameTableExp(t *testing.T) {
cases := []string{
"rename table `mydb`.`mytable` to `mydb`.`mytable1`",
"rename table `mytable` to `mytable1`",
"rename table mydb.mytable to mydb.mytable1",
"rename table mytable to mytable1",

"rename table `mydb`.`mytable` to `mydb`.`mytable2`, `mydb`.`mytable3` to `mydb`.`mytable1`",
"rename table `mytable` to `mytable2`, `mytable3` to `mytable1`",
"rename table mydb.mytable to mydb.mytable2, mydb.mytable3 to mydb.mytable1",
"rename table mytable to mytable2, mytable3 to mytable1",
}
table := []byte("mytable")
db := []byte("mydb")
for _, s := range cases {
m := expRenameTable.FindSubmatch([]byte(s))
mLen := len(m)
if m == nil || !bytes.Equal(m[mLen-1], table) || (len(m[mLen-2]) > 0 && !bytes.Equal(m[mLen-2], db)) {
t.Fatalf("TestRenameTableExp: case %s failed\n", s)
}
}
}

func TestDropTableExp(t *testing.T) {
cases := []string{
"drop table test1",
"DROP TABLE test1",
"DROP TABLE test1",
"DROP table IF EXISTS test.test1",
"drop table `test1`",
"DROP TABLE `test1`",
"DROP table IF EXISTS `test`.`test1`",
"DROP TABLE `test1` /* generated by server */",
"DROP table if exists test1",
"DROP table if exists `test1`",
"DROP table if exists test.test1",
"DROP table if exists `test`.test1",
"DROP table if exists `test`.`test1`",
"DROP table if exists test.`test1`",
"DROP table if exists test.`test1`",
}

table := []byte("test1")
for _, s := range cases {
m := expDropTable.FindSubmatch([]byte(s))
mLen := len(m)
if m == nil {
t.Fatalf("TestDropTableExp: case %s failed\n", s)
return
}
if mLen < 4 {
t.Fatalf("TestDropTableExp: case %s failed\n", s)
return
}
if !bytes.Equal(m[mLen-1], table) {
t.Fatalf("TestDropTableExp: case %s failed\n", s)
}
}
}
91 changes: 51 additions & 40 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package canal

import (
"fmt"
"regexp"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
_ "github.com/pingcap/tidb/types/parser_driver"
lintanghui marked this conversation as resolved.
Show resolved Hide resolved
"github.com/satori/go.uuid"
"github.com/siddontang/go-log/log"
"github.com/siddontang/go-mysql/mysql"
Expand All @@ -14,11 +16,7 @@ import (
)

var (
expCreateTable = regexp.MustCompile("(?i)^CREATE\\sTABLE(\\sIF\\sNOT\\sEXISTS)?\\s`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
expAlterTable = regexp.MustCompile("(?i)^ALTER\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
expRenameTable = regexp.MustCompile("(?i)^RENAME\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s{1,}TO\\s.*?")
expDropTable = regexp.MustCompile("(?i)^DROP\\sTABLE(\\sIF\\sEXISTS){0,1}\\s`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}(?:$|\\s)")
expTruncateTable = regexp.MustCompile("(?i)^TRUNCATE\\s+(?:TABLE\\s+)?(?:`?([^`\\s]+)`?\\.`?)?([^`\\s]+)`?")
sqlParse = parser.New()
)

func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
Expand Down Expand Up @@ -122,42 +120,38 @@ func (c *Canal) runSyncBinlog() error {
if e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
lintanghui marked this conversation as resolved.
Show resolved Hide resolved
}
var (
mb [][]byte
db []byte
table []byte
)
regexps := []regexp.Regexp{*expCreateTable, *expAlterTable, *expRenameTable, *expDropTable, *expTruncateTable}
for _, reg := range regexps {
mb = reg.FindSubmatch(e.Query)
if len(mb) != 0 {
break
}
}
mbLen := len(mb)
if mbLen == 0 {
stmts, _, err := sqlParse.Parse(string(e.Query), "", "")
if err != nil {
log.Errorf("parse query err %v", err)
lintanghui marked this conversation as resolved.
Show resolved Hide resolved
continue
}

// the first last is table name, the second last is database name(if exists)
if len(mb[mbLen-2]) == 0 {
db = e.Schema
} else {
db = mb[mbLen-2]
}
table = mb[mbLen-1]

savePos = true
force = true
c.ClearTableCache(db, table)
log.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
if err = c.eventHandler.OnTableChanged(string(db), string(table)); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
return errors.Trace(err)
}

// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(pos, e); err != nil {
return errors.Trace(err)
for _, stmt := range stmts {
switch t := stmt.(type) {
case *ast.RenameTableStmt:
for _, tableInfo := range t.TableToTables {
db := tableInfo.OldTable.Schema.String()
table := tableInfo.OldTable.Name.String()
c.updateTable(db, table, e, pos, ev.Header.Timestamp)
lintanghui marked this conversation as resolved.
Show resolved Hide resolved
}
case *ast.AlterTableStmt:
db := t.Table.Schema.String()
lintanghui marked this conversation as resolved.
Show resolved Hide resolved
table := t.Table.Name.String()
c.updateTable(db, table, e, pos, ev.Header.Timestamp)
case *ast.DropTableStmt:
for _, table := range t.Tables {
db := table.Schema.String()
table := table.Name.String()
c.updateTable(db, table, e, pos, ev.Header.Timestamp)
}
case *ast.CreateTableStmt:
db := t.Table.Schema.String()
table := t.Table.Name.String()
c.updateTable(db, table, e, pos, ev.Header.Timestamp)
case *ast.TruncateTableStmt:
db := t.Table.Schema.String()
table := t.Table.Name.String()
c.updateTable(db, table, e, pos, ev.Header.Timestamp)
}
lintanghui marked this conversation as resolved.
Show resolved Hide resolved
}
default:
continue
Expand All @@ -174,7 +168,24 @@ func (c *Canal) runSyncBinlog() error {

return nil
}
func (c *Canal) updateTable(db, table string, e *replication.QueryEvent, pos mysql.Position, ts uint32) (err error) {
c.ClearTableCache([]byte(db), []byte(table))
log.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
if err = c.eventHandler.OnTableChanged(string(db), string(table)); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
lintanghui marked this conversation as resolved.
Show resolved Hide resolved
return errors.Trace(err)
}

// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(pos, e); err != nil {
return errors.Trace(err)
}
c.master.Update(pos)
c.master.UpdateTimestamp(ts)
if err := c.eventHandler.OnPosSynced(pos, true); err != nil {
return errors.Trace(err)
}
return
}
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
ev := e.Event.(*replication.RowsEvent)

Expand Down
36 changes: 36 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,45 @@ module github.com/siddontang/go-mysql

require (
github.com/BurntSushi/toml v0.3.1
github.com/coreos/bbolt v1.3.2 // indirect
github.com/coreos/etcd v3.3.12+incompatible // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect
github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8 // indirect
github.com/go-sql-driver/mysql v1.4.1
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.8.5 // indirect
github.com/jmoiron/sqlx v1.2.0
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/opentracing/opentracing-go v1.1.0 // indirect
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errors v0.11.0
github.com/pingcap/kvproto v0.0.0-20190425131531-4ed0aa16f7ea // indirect
github.com/pingcap/parser v0.0.0-20190424024541-e2cdb851bce2
github.com/pingcap/tidb v0.0.0-20190108123336-c68ee7318319
github.com/pingcap/tipb v0.0.0-20190425035018-fe02cdb06c6a // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/prometheus/client_golang v0.9.2 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001 // indirect
github.com/satori/go.uuid v1.2.0
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07
github.com/sirupsen/logrus v1.4.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible // indirect
github.com/uber/jaeger-lib v2.0.0+incompatible // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.2 // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/appengine v1.5.0 // indirect
google.golang.org/grpc v1.20.1 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)
Loading