Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic name is invalid when starting MySQL Source(CDC) Connector #518

Open
Ziy1-Tan opened this issue Jun 25, 2023 · 2 comments
Open

Topic name is invalid when starting MySQL Source(CDC) Connector #518

Ziy1-Tan opened this issue Jun 25, 2023 · 2 comments

Comments

@Ziy1-Tan
Copy link
Contributor

  1. Please describe the issue you observed:
  • What did you do (The steps to reproduce)?

I follow the RocketMQ Connect example1.

  • Start RocketMQ
  • Start Connect Runtime sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
  • Start MySQL Source(CDC) Connector:
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/MySQLCDCSource -d '{
"connector.class": "org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
"max.task": "1",
"connect.topicname": "debezium-mysql-source-topic",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
"database.history.skip.unparseable.ddl": true,
"database.history.name.srv.addr": "localhost:9876",
"database.history.rocketmq.topic": "db-history-debezium-topic",
"database.history.store.only.monitored.tables.ddl": true,
"include.schema.changes": false,
"database.server.name": "dbserver1",
"database.port": 3306,
"database.hostname": "127.0.0.1",
"database.connectionTimeZone": "UTC",
"database.user": "debezium",
"database.password": "dbz",
"table.include.list": "inventory.employee",
"max.batch.size": 50,
"database.include.list": "inventory",
"snapshot.mode": "when_needed",
"database.server.id": "184054",
"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
  • What is expected to see?

The source connector and task start successfully.

  • What did you see instead?

The Source task failed to create topic: curl http://127.0.0.1:8082/connectors/list

{
    "status": 200,
    "body": {
        "MySQLCDCSource": {
            "status": {
                "name": "MySQLCDCSource",
                "connector": {
                    "state": "RUNNING",
                    "trace": null,
                    "workerId": "standalone-worker"
                },
                "tasks": [
                    {
                        "state": "FAILED",
                        "trace": "java.lang.RuntimeException: Create topic [dbserver1.inventory.employee] failed\n\tat org.apache.rocketmq.connect.runtime.utils.ConnectUtil.createTopic(ConnectUtil.java:189)\n\tat org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask.maybeCreateAndGetTopic(WorkerSourceTask.java:439)\n\tat org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask.sendRecord(WorkerSourceTask.java:237)\n\tat org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask.execute(WorkerSourceTask.java:518)\n\tat org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerTask.doExecute(WorkerTask.java:119)\n\tat org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerTask.doRun(WorkerTask.java:224)\n\tat org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerTask.run(WorkerTask.java:244)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.rocketmq.client.exception.MQClientException: CODE: 1  DESC: The specified topic contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$\nFor more information, please visit the url, http://rocketmq.apache.org/docs/faq/\n\tat org.apache.rocketmq.client.impl.MQClientAPIImpl.createTopic(MQClientAPIImpl.java:306)\n\tat org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.createAndUpdateTopicConfig(DefaultMQAdminExtImpl.java:201)\n\tat org.apache.rocketmq.tools.admin.DefaultMQAdminExt.createAndUpdateTopicConfig(DefaultMQAdminExt.java:170)\n\tat org.apache.rocketmq.connect.runtime.utils.ConnectUtil.createTopic(ConnectUtil.java:185)\n\t... 12 more\n",
                        "workerId": "standalone-worker",
                        "id": 0
                    }
                ],
                "type": "SOURCE"
            },
            "info": {
                "name": "MySQLCDCSource",
                "config": {
                    "connector.class": "org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
                    "kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
                    "include.schema.changes": "false",
                    "kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
                    "connect.topicname": "debezium-mysql-source-topic",
                    "database.history.skip.unparseable.ddl": "true",
                    "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
                    "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
                    "database.user": "debezium",
                    "kafka.transforms.Unwrap.delete.handling.mode": "none",
                    "database.server.id": "184054",
                    "database.server.name": "dbserver1",
                    "database.port": "3306",
                    "max.task": "1",
                    "database.history.rocketmq.topic": "db-history-debezium-topic",
                    "database.history.name.srv.addr": "localhost:9876",
                    "database.hostname": "127.0.0.1",
                    "database.connectionTimeZone": "UTC",
                    "database.password": "dbz",
                    "database.history.store.only.monitored.tables.ddl": "true",
                    "table.include.list": "inventory.employee",
                    "max.batch.size": "50",
                    "kafka.transforms": "Unwrap",
                    "database.include.list": "inventory",
                    "snapshot.mode": "when_needed"
                },
                "tasks": [
                    {
                        "connector": "MySQLCDCSource",
                        "task": 0
                    }
                ],
                "type": "SOURCE"
            }
        }
    }
}
  1. Please tell us about your environment:
  • Ubuntu 20.04
  • RocketMQ: develop
  • RocketMQ-Connect: master
@sunxiaojian
Copy link
Contributor

@Ziy1-Tan Rocketmq does not support creating topics with special characters. You can use Transforms to reroute to a new topic

@lxb6476
Copy link

lxb6476 commented Jan 31, 2024

遇到了同样的问题,对着源码看了半天,可能有理解不到位的地方,但是至少解决了创建主题这一步的问题,以下是解决方案:
源码仓库中:模块rocketmq-connect-runtime
org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask#maybeCreateAndGetTopic方法:

private String maybeCreateAndGetTopic(ConnectRecord record) {
        String topic = overwriteTopicFromRecord(record);
        topic = taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME);
// 这里把判断条件取消掉,直接从配置中获取主题名
//        if (StringUtils.isBlank(topic)) {
//            // topic from config
//            topic = taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME);
//        }
        if (StringUtils.isBlank(topic)) {
            throw new ConnectException("source connect lack of topic config");
        }

        if (!workerConfig.isAutoCreateTopicEnable() || topicCache.contains(topic)) {
            return topic;
        }
        if (!ConnectUtil.isTopicExist(workerConfig, topic)) {
            ConnectUtil.createTopic(workerConfig, new TopicConfig(topic));
        }
        topicCache.add(topic);
        return topic;
    }
    ```
这里前半部分`overwriteTopicFromRecord`方法没有细看但是大概理解是将插入数据的 数据库服务名.数据库名.数据库表 名作为主题名但是因为出现了`.`符号所以至少在RocketMQ5.x版本会出现创建主题失败的问题代码这里对于主题名的修改在传入的配置参数中优先级比`overwriteTopicFromRecord`方法生成的低出现配置了"connect.topicname"但是没有生效的问题不知道后续是否会优化。。。

----
Connect感觉是个比较有用的工具但是文档中举例的demo可能是限于个人水平只运行成功了文件的同步的例子在做其他demo的时候遇到不少问题对着源码一点点打断点也没能完全解决由于时间原因最后只能放弃使用未来还是希望项目能够热度再高一些维护得更好一些

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants