Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用sqlparser替换正则进行queryevent的解析 #382

Merged
merged 19 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/siddontang/go-log/log"
"github.com/siddontang/go-mysql/client"
"github.com/siddontang/go-mysql/dump"
Expand All @@ -28,6 +29,7 @@ type Canal struct {

cfg *Config

parser *parser.Parser
lintanghui marked this conversation as resolved.
Show resolved Hide resolved
master *masterInfo
dumper *dump.Dumper
dumped bool
Expand Down Expand Up @@ -65,7 +67,7 @@ func NewCanal(cfg *Config) (*Canal, error) {

c.dumpDoneCh = make(chan struct{})
c.eventHandler = &DummyEventHandler{}

c.parser = parser.New()
c.tables = make(map[string]*schema.Table)
if c.cfg.DiscardNoMetaRowEvent {
c.errorTablesGetTime = make(map[string]time.Time)
Expand Down Expand Up @@ -408,17 +410,17 @@ func (c *Canal) checkBinlogRowFormat() error {

func (c *Canal) prepareSyncer() error {
cfg := replication.BinlogSyncerConfig{
ServerID: c.cfg.ServerID,
Flavor: c.cfg.Flavor,
User: c.cfg.User,
Password: c.cfg.Password,
Charset: c.cfg.Charset,
HeartbeatPeriod: c.cfg.HeartbeatPeriod,
ReadTimeout: c.cfg.ReadTimeout,
UseDecimal: c.cfg.UseDecimal,
ParseTime: c.cfg.ParseTime,
SemiSyncEnabled: c.cfg.SemiSyncEnabled,
MaxReconnectAttempts: c.cfg.MaxReconnectAttempts,
ServerID: c.cfg.ServerID,
Flavor: c.cfg.Flavor,
User: c.cfg.User,
Password: c.cfg.Password,
Charset: c.cfg.Charset,
HeartbeatPeriod: c.cfg.HeartbeatPeriod,
ReadTimeout: c.cfg.ReadTimeout,
UseDecimal: c.cfg.UseDecimal,
ParseTime: c.cfg.ParseTime,
SemiSyncEnabled: c.cfg.SemiSyncEnabled,
MaxReconnectAttempts: c.cfg.MaxReconnectAttempts,
TimestampStringLocation: c.cfg.TimestampStringLocation,
}

Expand Down
162 changes: 101 additions & 61 deletions canal/canal_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package canal

import (
"bytes"
"flag"
"fmt"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/siddontang/go-log/log"
"github.com/siddontang/go-mysql/mysql"
)
Expand Down Expand Up @@ -140,98 +140,138 @@ func (s *canalTestSuite) TestCanalFilter(c *C) {

func TestCreateTableExp(t *testing.T) {
cases := []string{
"CREATE TABLE `mydb.mytable` (`id` int(10)) ENGINE=InnoDB",
"CREATE TABLE `mytable` (`id` int(10)) ENGINE=InnoDB",
"CREATE TABLE IF NOT EXISTS `mytable` (`id` int(10)) ENGINE=InnoDB",
"CREATE TABLE IF NOT EXISTS mytable (`id` int(10)) ENGINE=InnoDB",
"CREATE TABLE /*generated by server */ mydb.mytable (`id` int(10)) ENGINE=InnoDB",
"CREATE TABLE `mydb`.`mytable` (`id` int(10)) ENGINE=InnoDB",
"CREATE TABLE IF NOT EXISTS mydb.`mytable` (`id` int(10)) ENGINE=InnoDB",
"CREATE TABLE IF NOT EXISTS `mydb`.mytable (`id` int(10)) ENGINE=InnoDB",
}
table := []byte("mytable")
db := []byte("mydb")
table := "mytable"
db := "mydb"
pr := parser.New()
for _, s := range cases {
m := expCreateTable.FindSubmatch([]byte(s))
mLen := len(m)
if m == nil || !bytes.Equal(m[mLen-1], table) || (len(m[mLen-2]) > 0 && !bytes.Equal(m[mLen-2], db)) {
t.Fatalf("TestCreateTableExp: case %s failed\n", s)
stmts, _, err := pr.Parse(s, "", "")
if err != nil {
t.Fatalf("TestCreateTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
if len(nodes) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if len(nodes) != 1, then fatal? similar with others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one stmt may contains multi tables, eg: rename a to b,c to d; then nodes will contain two nodes a and c.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I think in these cases, we must assert the number of the nodes is what we want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what we need to do is to range nodes and then clear old table cache, rather than care about the number if nodes. if len(nodes)<0 range it, if len(nodes)==0 do nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In test cases, we must ensure parseStmt works correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, some test had been writed to test parseStmt

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some test had been writed to test parseStmt

so, test the length of return nodes too?

continue
}
if nodes[0].db != db || nodes[0].table != table {
t.Fatalf("TestCreateTableExp:case %s failed\n", s)
}
}
}
}

func TestAlterTableExp(t *testing.T) {
cases := []string{
"ALTER TABLE `mydb`.`mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE /*generated by server*/ `mydb`.`mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE `mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mydb.mytable ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mytable ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mydb.mytable ADD field2 DATE NULL AFTER `field1`;",
}

table := []byte("mytable")
db := []byte("mydb")
table := "mytable"
db := "mydb"
pr := parser.New()
for _, s := range cases {
m := expAlterTable.FindSubmatch([]byte(s))
mLen := len(m)
if m == nil || !bytes.Equal(m[mLen-1], table) || (len(m[mLen-2]) > 0 && !bytes.Equal(m[mLen-2], db)) {
t.Fatalf("TestAlterTableExp: case %s failed\n", s)
stmts, _, err := pr.Parse(s, "", "")
if err != nil {
t.Fatalf("TestAlterTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
if len(nodes) == 0 {
continue
}
rdb := nodes[0].db
rtable := nodes[0].table
if (len(rdb) > 0 && rdb != db) || rtable != table {
t.Fatalf("TestAlterTableExp:case %s failed db %s,table %s\n", s, rdb, rtable)
}
}
}
}

func TestRenameTableExp(t *testing.T) {
cases := []string{
"rename table `mydb`.`mytable` to `mydb`.`mytable1`",
"rename table `mytable` to `mytable1`",
"rename table mydb.mytable to mydb.mytable1",
"rename table mytable to mytable1",

"rename table `mydb`.`mytable` to `mydb`.`mytable2`, `mydb`.`mytable3` to `mydb`.`mytable1`",
"rename table `mytable` to `mytable2`, `mytable3` to `mytable1`",
"rename table mydb.mytable to mydb.mytable2, mydb.mytable3 to mydb.mytable1",
"rename table mytable to mytable2, mytable3 to mytable1",
"rename /* generate by server */table `mydb`.`mytable0` to `mydb`.`mytable0tmp`",
"rename table `mytable0` to `mytable0tmp`",
"rename table mydb.mytable0 to mydb.mytable0tmp",
"rename table mytable0 to mytable0tmp",

"rename table `mydb`.`mytable0` to `mydb`.`mytable0tmp`, `mydb`.`mytable1` to `mydb`.`mytable1tmp`",
"rename table `mytable0` to `mytable0tmp`, `mytable1` to `mytable1tmp`",
"rename table mydb.mytable0 to mydb.mytable0tmp, mydb.mytable1 to mydb.mytable1tmp",
"rename table mytable0 to mytable0tmp, mytable1 to mytabletmp",
}
table := []byte("mytable")
db := []byte("mydb")
baseTable := "mytable"
db := "mydb"
pr := parser.New()
for _, s := range cases {
m := expRenameTable.FindSubmatch([]byte(s))
mLen := len(m)
if m == nil || !bytes.Equal(m[mLen-1], table) || (len(m[mLen-2]) > 0 && !bytes.Equal(m[mLen-2], db)) {
t.Fatalf("TestRenameTableExp: case %s failed\n", s)
stmts, _, err := pr.Parse(s, "", "")
if err != nil {
t.Fatalf("TestRenameTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
if len(nodes) == 0 {
continue
}
for i, node := range nodes {
rdb := node.db
rtable := node.table
table := fmt.Sprintf("%s%d", baseTable, i)
if (len(rdb) > 0 && rdb != db) || rtable != table {
t.Fatalf("TestRenameTableExp:case %s failed db %s,table %s\n", s, rdb, rtable)
}
}
}
}
}

func TestDropTableExp(t *testing.T) {
cases := []string{
"drop table test1",
"DROP TABLE test1",
"DROP TABLE test1",
"DROP table IF EXISTS test.test1",
"drop table `test1`",
"DROP TABLE `test1`",
"DROP table IF EXISTS `test`.`test1`",
"DROP TABLE `test1` /* generated by server */",
"DROP table if exists test1",
"DROP table if exists `test1`",
"DROP table if exists test.test1",
"DROP table if exists `test`.test1",
"DROP table if exists `test`.`test1`",
"DROP table if exists test.`test1`",
"DROP table if exists test.`test1`",
"drop table test0",
"DROP TABLE test0",
"DROP TABLE test0",
"DROP table IF EXISTS test.test0",
"drop table `test0`",
"DROP TABLE `test0`",
"DROP table IF EXISTS `test`.`test0`",
"DROP TABLE `test0` /* generated by server */",
"DROP /*generated by server */ table if exists test0",
"DROP table if exists `test0`",
"DROP table if exists test.test0",
"DROP table if exists `test`.test0",
"DROP table if exists `test`.`test0`",
"DROP table if exists test.`test0`",
"DROP table if exists test.`test0`",
}

table := []byte("test1")
baseTable := "test"
db := "test"
pr := parser.New()
for _, s := range cases {
m := expDropTable.FindSubmatch([]byte(s))
mLen := len(m)
if m == nil {
t.Fatalf("TestDropTableExp: case %s failed\n", s)
return
}
if mLen < 4 {
t.Fatalf("TestDropTableExp: case %s failed\n", s)
return
stmts, _, err := pr.Parse(s, "", "")
if err != nil {
t.Fatalf("TestDropTableExp:case %s failed\n", s)
}
if !bytes.Equal(m[mLen-1], table) {
t.Fatalf("TestDropTableExp: case %s failed\n", s)
for _, st := range stmts {
nodes := parseStmt(st)
if len(nodes) == 0 {
continue
}
for i, node := range nodes {
rdb := node.db
rtable := node.table
table := fmt.Sprintf("%s%d", baseTable, i)
if (len(rdb) > 0 && rdb != db) || rtable != table {
t.Fatalf("TestDropTableExp:case %s failed db %s,table %s\n", s, rdb, rtable)
}
}
}
}
}
48 changes: 48 additions & 0 deletions canal/expr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package canal

import (
"io"

"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/format"
)

func init() {
ast.NewValueExpr = newValueExpr
ast.NewParamMarkerExpr = newParamExpr
ast.NewDecimal = func(_ string) (interface{}, error) {
return nil, nil
}
ast.NewHexLiteral = func(_ string) (interface{}, error) {
return nil, nil
}
ast.NewBitLiteral = func(_ string) (interface{}, error) {
return nil, nil
}
}

type paramExpr struct {
valueExpr
}

func newParamExpr(_ int) ast.ParamMarkerExpr {
return &paramExpr{}
}
func (pe *paramExpr) SetOrder(o int) {}

type valueExpr struct {
ast.TexprNode
}

func newValueExpr(_ interface{}) ast.ValueExpr { return &valueExpr{} }
func (ve *valueExpr) SetValue(val interface{}) {}
func (ve *valueExpr) GetValue() interface{} { return nil }
func (ve *valueExpr) GetDatumString() string { return "" }
func (ve *valueExpr) GetString() string { return "" }
func (ve *valueExpr) GetProjectionOffset() int { return 0 }
func (ve *valueExpr) SetProjectionOffset(offset int) {}
func (ve *valueExpr) Restore(ctx *format.RestoreCtx) error { return nil }
func (ve *valueExpr) Accept(v ast.Visitor) (node ast.Node, ok bool) { return }
func (ve *valueExpr) Text() string { return "" }
func (ve *valueExpr) SetText(text string) {}
func (ve *valueExpr) Format(w io.Writer) {}
Loading