Skip to content

Commit

Permalink
合并master分支
Browse files Browse the repository at this point in the history
  • Loading branch information
qiao.zeng committed Nov 12, 2023
2 parents b1aa12b + e24a582 commit a8be274
Show file tree
Hide file tree
Showing 73 changed files with 1,555 additions and 896 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: KnowStreaming Build

on:
push:
branches: [ "master", "ve_3.x", "ve_demo_3.x" ]
branches: [ "*" ]
pull_request:
branches: [ "master", "ve_3.x", "ve_demo_3.x" ]
branches: [ "*" ]

jobs:
build:
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@

**点击 [这里](https://doc.knowstreaming.com/product),也可以从官网获取到更多文档**


**`产品网址`**
- [产品官网:https://knowstreaming.com](https://knowstreaming.com)
- [体验环境:https://demo.knowstreaming.com](https://demo.knowstreaming.com),登陆账号:admin/admin



Expand Down
5 changes: 5 additions & 0 deletions docs/install_guide/版本升级手册.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `l
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2046', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2048', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2050', '0', 'know-streaming');


-- 多集群管理权限2023-07-18新增
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2052', 'Security-User查看密码', '1593', '1', '2', 'Security-User查看密码', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2052', '0', 'know-streaming');
```

### 升级至 `3.3.0` 版本
Expand Down
33 changes: 31 additions & 2 deletions docs/user_guide/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
- [1、支持哪些 Kafka 版本?](#1支持哪些-kafka-版本)
- [1、2.x 版本和 3.0 版本有什么差异?](#12x-版本和-30-版本有什么差异)
- [3、页面流量信息等无数据?](#3页面流量信息等无数据)
- [8.4、`Jmx`连接失败如何解决?](#84jmx连接失败如何解决)
- [4、`Jmx`连接失败如何解决?](#4jmx连接失败如何解决)
- [5、有没有 API 文档?](#5有没有-api-文档)
- [6、删除 Topic 成功后,为何过段时间又出现了?](#6删除-topic-成功后为何过段时间又出现了)
- [7、如何在不登录的情况下,调用接口?](#7如何在不登录的情况下调用接口)
Expand All @@ -21,6 +21,7 @@
- [15、测试时使用Testcontainers的说明](#15测试时使用testcontainers的说明)
- [16、JMX连接失败怎么办](#16jmx连接失败怎么办)
- [17、zk监控无数据问题](#17zk监控无数据问题)
- [18、启动失败,报NoClassDefFoundError如何解决](#18启动失败报noclassdeffounderror如何解决)

## 1、支持哪些 Kafka 版本?

Expand Down Expand Up @@ -57,7 +58,7 @@

 

## 8.4、`Jmx`连接失败如何解决?
## 4、`Jmx`连接失败如何解决?

- 参看 [Jmx 连接配置&问题解决](https://doc.knowstreaming.com/product/9-attachment#91jmx-%E8%BF%9E%E6%8E%A5%E5%A4%B1%E8%B4%A5%E9%97%AE%E9%A2%98%E8%A7%A3%E5%86%B3) 说明。

Expand Down Expand Up @@ -278,3 +279,31 @@ zookeeper集群正常,但Ks上zk页面所有监控指标无数据,`KnowStrea
```
4lw.commands.whitelist=*
```
## 18、启动失败,报NoClassDefFoundError如何解决
**错误现象:**
```log
# 启动失败,报nested exception is java.lang.NoClassDefFoundError: Could not initialize class com.didiglobal.logi.job.core.WorkerSingleton$Singleton
2023-08-11 22:54:29.842 [main] ERROR class=org.springframework.boot.SpringApplication||Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'quartzScheduler' defined in class path resource [com/didiglobal/logi/job/LogIJobAutoConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.didiglobal.logi.job.core.Scheduler]: Factory method 'quartzScheduler' threw exception; nested exception is java.lang.NoClassDefFoundError: Could not initialize class com.didiglobal.logi.job.core.WorkerSingleton$Singleton
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:657)
```


**问题原因:**
1. `KnowStreaming` 依赖的 `Logi-Job` 初始化 `WorkerSingleton$Singleton` 失败。
2. `WorkerSingleton$Singleton` 初始化的过程中,会去获取一些操作系统的信息,如果获取时出现了异常,则会导致 `WorkerSingleton$Singleton` 初始化失败。


**临时建议:**

`Logi-Job` 问题的修复时间不好控制,之前我们测试验证了一下,在 `Windows``Mac``CentOS` 这几个操作系统下基本上都是可以正常运行的。

所以,如果有条件的话,可以暂时先使用这几个系统部署 `KnowStreaming`

如果在在 `Windows``Mac``CentOS` 这几个操作系统下也出现了启动失败的问题,可以重试2-3次看是否还是启动失败,或者换一台机器试试。


Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO;
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
import org.apache.kafka.connect.runtime.AbstractStatus;
Expand All @@ -30,6 +31,9 @@ public class ConnectorManagerImpl implements ConnectorManager {
@Autowired
private ConnectorService connectorService;

@Autowired
private OpConnectorService opConnectorService;

@Autowired
private WorkerConnectorService workerConnectorService;

Expand All @@ -44,37 +48,37 @@ public Result<Void> updateConnectorConfig(Long connectClusterId, String connecto
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Connector参数错误");
}

return connectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
return opConnectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
}

@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());

Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
Result<KSConnectorInfo> createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}

Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
Result<KSConnector> ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName());
if (ksConnectorResult.failed()) {
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
}

connectorService.addNewToDB(ksConnectorResult.getData());
opConnectorService.addNewToDB(ksConnectorResult.getData());
return Result.buildSuc();
}

@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) {
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());

Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
Result<KSConnectorInfo> createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}

Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
Result<KSConnector> ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName());
if (ksConnectorResult.failed()) {
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
}
Expand All @@ -83,7 +87,7 @@ public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName
connector.setCheckpointConnectorName(checkpointName);
connector.setHeartbeatConnectorName(heartbeatName);

connectorService.addNewToDB(connector);
opConnectorService.addNewToDB(connector);
return Result.buildSuc();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
Expand Down Expand Up @@ -67,6 +68,9 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
@Autowired
private ConnectorService connectorService;

@Autowired
private OpConnectorService opConnectorService;

@Autowired
private WorkerConnectorService workerConnectorService;

Expand Down Expand Up @@ -156,20 +160,20 @@ public Result<Void> deleteMirrorMaker(Long connectClusterId, String sourceConnec

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.deleteConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.deleteConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand All @@ -181,20 +185,20 @@ public Result<Void> modifyMirrorMakerConfig(MirrorMakerCreateDTO dto, String ope

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName()) && dto.getCheckpointConnectorConfigs() != null) {
rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator);
rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName()) && dto.getHeartbeatConnectorConfigs() != null) {
rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator);
rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
return opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
}

@Override
Expand All @@ -206,20 +210,20 @@ public Result<Void> restartMirrorMaker(Long connectClusterId, String sourceConne

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.restartConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.restartConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand All @@ -231,20 +235,20 @@ public Result<Void> stopMirrorMaker(Long connectClusterId, String sourceConnecto

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.stopConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.stopConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand All @@ -256,20 +260,20 @@ public Result<Void> resumeMirrorMaker(Long connectClusterId, String sourceConnec

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.resumeConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.resumeConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public List<ConnectorMetrics> collectConnectMetrics(ConnectCluster connectCluste
Long connectClusterId = connectCluster.getId();

List<VersionControlItem> items = versionControlService.listVersionControlItem(this.getClusterVersion(connectCluster), collectorType().getCode());
Result<List<String>> connectorList = connectorService.listConnectorsFromCluster(connectClusterId);
Result<List<String>> connectorList = connectorService.listConnectorsFromCluster(connectCluster);

FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(connectClusterId);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
Expand All @@ -12,20 +11,18 @@
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ConnectClusterMetrics extends BaseMetrics {
private Long connectClusterId;
protected Long connectClusterId;

public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId){
public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId ){
super(clusterPhyId);
this.connectClusterId = connectClusterId;
}

public static ConnectClusterMetrics initWithMetric(Long connectClusterId, String metric, Float value) {
ConnectClusterMetrics brokerMetrics = new ConnectClusterMetrics(connectClusterId, connectClusterId);
brokerMetrics.putMetric(metric, value);
return brokerMetrics;
public ConnectClusterMetrics(Long connectClusterId, String metricName, Float metricValue) {
this(null, connectClusterId);
this.putMetric(metricName, metricValue);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
Expand All @@ -11,25 +9,19 @@
* @date 2022/11/2
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class ConnectWorkerMetrics extends BaseMetrics {

private Long connectClusterId;

public class ConnectWorkerMetrics extends ConnectClusterMetrics {
private String workerId;

public static ConnectWorkerMetrics initWithMetric(Long connectClusterId, String workerId, String metric, Float value) {
ConnectWorkerMetrics connectWorkerMetrics = new ConnectWorkerMetrics();
connectWorkerMetrics.setConnectClusterId(connectClusterId);
connectWorkerMetrics.setWorkerId(workerId);
connectWorkerMetrics.putMetric(metric, value);
return connectWorkerMetrics;
public ConnectWorkerMetrics(Long connectClusterId, String workerId, String metricName, Float metricValue) {
super(null, connectClusterId);
this.workerId = workerId;
this.putMetric(metricName, metricValue);
}

@Override
public String unique() {
return "KCC@" + clusterPhyId + "@" + connectClusterId + "@" + workerId;
return "KCW@" + clusterPhyId + "@" + connectClusterId + "@" + workerId;
}
}
Loading

0 comments on commit a8be274

Please sign in to comment.