Skip to content

Commit

Permalink
[INLONG-1086][Doc] Add documents for DolphinScheduler offline schedul…
Browse files Browse the repository at this point in the history
…er (#1088)
  • Loading branch information
emptyOVO authored Dec 5, 2024
1 parent 99bade7 commit fb20d83
Show file tree
Hide file tree
Showing 53 changed files with 253 additions and 44 deletions.
97 changes: 97 additions & 0 deletions docs/quick_start/offline_data_sync/dolphinscheduler_example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
---
title: Use DolphinScheduler third-party scheduling engine
sidebar_position: 2
---

In the following content, we will introduce how to use DolphinScheduler, a third-party schedule engine in Apache InLong to create offline data synchronization.

## Deployment

### Install InLong

Before we begin, we need to install InLong and a usable DolphinScheduler. Here we provide two ways:

- [Docker Deployment](deployment/docker.md) (Recommended)
- [Bare Metal Deployment](deployment/bare_metal.md)

### Add Connectors

Download the [connectors](https://inlong.apache.org/downloads/) corresponding to Flink version, and after decompression, place `sort-connector-jdbc-[version]-SNAPSHOT.jar` in `/inlong-sort/connectors/` directory.

> Currently, Apache InLong's offline data synchronization capability only supports Flink-1.18, so please download the 1.18 version of connectors.
### Operations on DolphinScheduler

Before using DolphinScheduler as your scheduling engine, please make sure you have a working DolphinScheduler on hand. If you need to deploy a DolphinScheduler for yourself, please refer to the [DolphinScheduler Official Document](https://dolphinscheduler.apache.org/zh-cn).

![DolphinScheduler Security](img/pulsar_mysql/dolphinscheduler/ds_security.png)

![DolphinScheduler Token Manager](img/pulsar_mysql/dolphinscheduler/ds_token_manager.png)

Go into Token Manager page to create a token for InLong to access.

![DolphinScheduler Token Generate](img/pulsar_mysql/dolphinscheduler/ds_token_generate.png)

Set parameters for the token according to the steps in the figure, include [Expiration Time], [User], then generate a token.

![DolphinScheduler Token Copy](img/pulsar_mysql/dolphinscheduler/ds_token_copy.png)

### Modify configuration in InLong Manager

For third-party scheduling engine, we need to modify configurations in manager.

For DolphinScheduler engine there are following configurations need to be modified:

* `schedule.engine.inlong.manager.url` : Third-party scheduling engine needs to access the inlong manager through this url.
* `schedule.engine.dolphinscheduler.url` : DolphinScheduler deployment url, general format is http://{ip}:{port}/dolphinscheduler
* `schedule.engine.dolphinscheduler.token` : Token you just generated in Token Manager of DolphinScheduler.

![InLong Manager Configuration](img/pulsar_mysql/dolphinscheduler/inlong_manager_conf.png)

After doing this, restart the InLong Manager to ensure the configuration is enabled.

### Use DolphinScheduler in offline synchronization

During configure the offline synchronization task, choose DolphinScheduler when selecting the scheduling engine, then configure other parameters.

![DolphinScheduler Task Configuration](img/pulsar_mysql/dolphinscheduler/ds_task_conf.png)

For details about how to manage clusters and configure data nodes, see [Use Quartz built-in scheduling engine](quartz_example.md).

After approval data flow, return to the [Synchronization] page and wait for the task configuration to succeed. Once configured successfully, the DolphinScheduler will periodically calls back InLong to synchronize offline data and the Manager will periodically submit Flink Batch Jobs to the Flink cluster.

![DolphinScheduler Schedule Process](img/pulsar_mysql/dolphinscheduler/ds_schedule_process.png)

![DolphinScheduler Process Success](img/pulsar_mysql/dolphinscheduler/ds_process_success.png)

![DolphinScheduler Process Instance](img/pulsar_mysql/dolphinscheduler/ds_process_instance.png)

View the DolphinScheduler task instance logs. The following logs indicate that the configuration is successful.

![DolphinScheduler Schedule Success](img/pulsar_mysql/dolphinscheduler/ds_schedule_success.png)

## Test Data

### Sending Data

Use the Pulsar SDK to produce data into the Pulsar topic. An example is as follows:

```java
// Create Pulsar client and producer
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("public/default/test").create();

// Send messages
for (int i = 0; i < 10000; i++) {
// Field separator is |
String msgStr = i + "|msg-" + i;
MessageId msgId = producer.send(msgStr.getBytes(StandardCharsets.UTF_8));
System.out.println("Send msg : " + msgStr + " with msgId: " + msgId);
}
```

### Data Validation

Then enter MySQL to check the data in the table:

![MySQL Result](img/pulsar_mysql/dolphinscheduler/mysql_result.png)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: Pulsar to MySQL Example
title: Use Quartz built-in scheduling engine
sidebar_position: 1
---

In the following content, we will introduce how to use Apache InLong to create offline data synchronization from Pulsar to MySQL through a complete example.
In the following content, we will introduce how to use built-in schedule engine in Apache InLong to create offline data synchronization from Pulsar to MySQL through a complete example.

## Deployment
### Install InLong
Expand All @@ -18,7 +18,7 @@ Download the [connectors](https://inlong.apache.org/downloads/) corresponding to
> Currently, Apache InLong's offline data synchronization capability only supports Flink-1.18, so please download the 1.18 version of connectors.
## Cluster Initialize
When all containers are successfully started, you can access the InLong dashboard address http://localhost, and use the following default account to log in.
When all containers are successfully started, you can access the InLong dashboard address `http://localhost`, and use the following default account to log in.
```
User: admin
Password: inlong
Expand All @@ -27,13 +27,13 @@ Password: inlong
### Create Cluster Tag
Click [Clusters] -> [ClusterTags] -> [Create] on the page to specify the cluster label name and person in charge.

![Create Cluster Tag](img/pulsar_mysql/cluster_tag.png)
![Create Cluster Tag](img/pulsar_mysql/quartz/cluster_tag.png)

**caution: `default_cluster` is the default ClusterTags for each component. If you decide to use a different name, make sure to update the corresponding tag configuration accordingly.**

### Register Pulsar Cluster

![Create Pulsar](img/pulsar_mysql/create_pulsar_cluster.png)
![Create Pulsar](img/pulsar_mysql/quartz/create_pulsar_cluster.png)

**You can refer to the screenshot information to fill in details such as cluster name, associated tag, and Pulsar cluster address.**

Expand All @@ -42,29 +42,37 @@ Click [Clusters] -> [ClusterTags] -> [Create] on the page to specify the cluster
Click on [Synchronization][Create], fill in the Group ID, and ensure the [Sync Type] is checked as "Offline".


![Create Offline Group](img/pulsar_mysql/create_offline_group.png)
![Create Offline Group](img/pulsar_mysql/quartz/create_offline_group.png)

### Configuration Scheduling Rules
After selecting "offline" for [Sync Type], you can configure the [Scheduling Rules] for offline tasks. Currently, two types are supported: Conventional and Crontab.
After selecting "offline" for [Sync Type], you can configure the [Scheduling Rules] for offline tasks. Scheduling Rules include two parts: [Scheduling Engine] and [Scheduling Type].

#### Scheduling Engine
Apache InLong provide several scheduling engines for users to choose from, Quartz is a build-in schedule engine in Apache InLong. Here we use quartz to handle tasks.

![Schedule Engine Type](img/pulsar_mysql/quartz/schedule_engine_type.png)

#### Schedule Type
Apache InLong supports two scheduling types currently: Conventional and Crontab.

Conventional Scheduling Configuration requires the following parameters:
- Scheduling Unit: Supports minutes, hours, days, months, years, and single execution (single execution means it will run only once).
- Scheduling Interval: Indicates the time interval between two task schedules.
- Delay Time: Indicates the delay time for task startup.
- Valid Time: Includes start time and end time; the scheduled task will only execute within this time range.

![Conventional Schedule Rule](img/pulsar_mysql/conventional_schedule.png)
![Conventional Schedule Rule](img/pulsar_mysql/quartz/conventional_schedule.png)

Crontab Scheduling requires the following parameters:
- Valid Time: Includes start time and end time; the scheduled task will only execute within this time range.
- Crontab Expression: Indicates the task cycle, e.g., `0 */5 * * * ?`
- Crontab Expression: Indicates the task cycle, e.g. `0 */5 * * * ?`

![Cron Schedule Rule](img/pulsar_mysql/cron_schedule.png)
![Cron Schedule Rule](img/pulsar_mysql/quartz/cron_schedule.png)

### Create Data Source
In the data source section, click [Create][Pulsar], and configure the data source name, Pulsar tenant, namespace, topic, admin URL, service URL, data format, and other parameters.

![Create Source](img/pulsar_mysql/source.png)
![Create Source](img/pulsar_mysql/quartz/source.png)

Note: The Pulsar topic needs to be created in the Pulsar cluster in advance (or enable the automatic topic creation feature in the Pulsar cluster).

Expand All @@ -82,25 +90,25 @@ CREATE TABLE sink_table (

In the data sink section, click [Create][MySQL], and configure the data sink name, database name, and table name (test.sink_table), among other information.

![Create Sink](img/pulsar_mysql/sink.png)
![Create Sink](img/pulsar_mysql/quartz/sink.png)

### Configure source and sink fields

Configure schema mapping information in the [Source fields] and [Sink fields] sections, and click [Submit] for approval.

![Create Source Fields](img/pulsar_mysql/source_fields.png)
![Create Source Fields](img/pulsar_mysql/quartz/source_fields.png)

![Create Sink Fields](img/pulsar_mysql/sink_fields.png)
![Create Sink Fields](img/pulsar_mysql/quartz/sink_fields.png)

### Approval data flow

On the page, click [Approval] -> [My Approvals] -> [Approve][OK].

![Approve](img/pulsar_mysql/approve.png)
![Approve](img/pulsar_mysql/quartz/approve.png)

Return to the [Synchronization] page and wait for the task configuration to succeed. Once configured successfully, the Manager will periodically submit Flink Batch Jobs to the Flink cluster.

![Flink Batch Job](img/pulsar_mysql/flink_batch_job.png)
![Flink Batch Job](img/pulsar_mysql/quartz/flink_batch_job.png)

## Test Data
### Sending Data
Expand All @@ -124,4 +132,4 @@ Use the Pulsar SDK to produce data into the Pulsar topic. An example is as follo

Then enter MySQL to check the data in the table:

![Mysql Sink](img/pulsar_mysql/mysql_sink.png)
![Mysql Sink](img/pulsar_mysql/quartz/mysql_sink.png)
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
---
title: 使用第三方 DolphinScheduler 调度引擎
sidebar_position: 2
---

在接下来的内容中,我们将介绍如何在 Apache InLong 中集成第三方调度引擎 DolphinScheduler 来创建离线数据同步。

## 环境部署

### 安装 InLong

在开始之前,我们需要安装 InLong 的全部组件,这里提供两种方式:

- [Docker 部署](deployment/docker.md)(推荐)
- [Bare Metal 部署](deployment/bare_metal.md)

### 添加 Connectors

下载与 Flink 版本对应的 [connectors](https://inlong.apache.org/zh-CN/downloads),解压后将 `sort-connector-jdbc-[version]-SNAPSHOT.jar` 放在 `/inlong-sort/connectors/` 目录下。
> 当前 Apache InLong 的离线数据同步能力只支持 Flink-1.18 版本,所以请下载 1.18 版本的 connectors。
### 在 DolphinScheduler 上的操作

在使用 DolphinScheduler 作为调度引擎之前,请确保有可以提供服务的 DolphinScheduler 。如果您需要为自己部署一个 DolphinScheduler,请参考 [DolphinScheduler 官方文档](https://dolphinscheduler.apache.org/zh-cn)

![DolphinScheduler Security](img/pulsar_mysql/dolphinscheduler/ds_security.png)

![DolphinScheduler Token Manager](img/pulsar_mysql/dolphinscheduler/ds_token_manager.png)

进入令牌管理器页面创建一个令牌供 InLong 进行访问。

![DolphinScheduler Token Generate](img/pulsar_mysql/dolphinscheduler/ds_token_generate.png)

按照图中步骤设置 token 参数,包括 [过期时间][用户],并生成令牌。

![DolphinScheduler Token Copy](img/pulsar_mysql/dolphinscheduler/ds_token_copy.png)

### 修改 InLong Manager 中的配置

对于第三方调度引擎,我们需要在 InLong Manager 中修改配置。

对于 DolphinScheduler 引擎,需要修改以下配置:

* `schedule.engine.inlong.manager.url` : 第三方调度引擎需要通过该 url 访问 InLong Manager。
* `schedule.engine.dolphinscheduler.url` : DolphinScheduler 部署的 url,一般格式为 http://{ip}:{port}/dolphinScheduler
* `schedule.engine.dolphinscheduler.token` : 上文中在 DolphinScheduler 令牌管理器页面中生成的 Token。

![InLong Manager Configuration](img/pulsar_mysql/dolphinscheduler/inlong_manager_conf.png)

完成这些操作后,重新启动 InLong Manager 以确保配置成功启用。

### 在离线同步任务中使用 DolphinScheduler

在配置离线同步任务时,在选择调度引擎时选择 DolphinScheduler,然后配置其他参数。

![DolphinScheduler Task Configuration](img/pulsar_mysql/dolphinscheduler/ds_task_conf.png)

集群管理和相关数据节点的配置请参见[使用内置的 Quartz 调度引擎](quartz_example.md)

审批数据流后,返回【数据同步】页面,等待任务配置成功,配置成功后,DolphinScheduler 将定期回调 InLong Manager,并由 InLong Manager 周期提交 Flink Batch Job 到 Flink 集群。

![DolphinScheduler Schedule Process](img/pulsar_mysql/dolphinscheduler/ds_schedule_process.png)

![DolphinScheduler Process Success](img/pulsar_mysql/dolphinscheduler/ds_process_success.png)

![DolphinScheduler Process Instance](img/pulsar_mysql/dolphinscheduler/ds_process_instance.png)

查看 DolphinScheduler 任务实例日志,输出如下日志表示配置成功。

![DolphinScheduler Schedule Success](img/pulsar_mysql/dolphinscheduler/ds_schedule_success.png)

## 测试数据

### 发送数据

通过 Pulsar SDK 生产数据写入的 Pulsar topic 中,示例如下:

```java
// Create Pulsar client and producer
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("public/default/test").create();

// Send messages
for (int i = 0; i < 10000; i++) {
// Field separator is |
String msgStr = i + "|msg-" + i;
MessageId msgId = producer.send(msgStr.getBytes(StandardCharsets.UTF_8));
System.out.println("Send msg : " + msgStr + " with msgId: " + msgId);
}
```

### 数据验证

然后进入 Mysql,查看库表数据,可以看到数据已经同步到 MySQL 中。

![MySQL Result](img/pulsar_mysql/dolphinscheduler/mysql_result.png)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit fb20d83

Please sign in to comment.