Skip to content

Commit

Permalink
[INLONG-1091][Doc] Unify the title and introduction (#1092)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zkplo authored Dec 9, 2024
1 parent 8d250a6 commit 48702b5
Show file tree
Hide file tree
Showing 17 changed files with 155 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
---
title: Example of Airflow Offline Synchronization
title: Airflow Scheduling Engine Example
sidebar_position: 3
---
In the following content, a complete example will be used to introduce how to create Airflow scheduling tasks using Apache InLong and complete offline data synchronization from Pulsar to MySQL.

In the following sections, we will walk through a complete example to demonstrate how to integrate the third-party scheduling engine Airflow into Apache InLong to create an offline data synchronization from Pulsar to MySQL.

## Deployment
### Install InLong
Expand All @@ -17,17 +18,22 @@ Download the [connectors](https://inlong.apache.org/downloads/) corresponding to
> Currently, Apache InLong's offline data synchronization capability only supports Flink-1.18, so please download the 1.18 version of connectors.
## Create Clusters And Data Target
When all containers are successfully started, you can access the InLong dashboard address `http://localhost`, and use the following default account to log in.
```
User: admin
Password: inlong
```

### Create Cluster Label
![airflow_create_cluster_labels](img/pulsar_mysql/airflow/airflow_create_cluster_labels.png)
### Create Cluster Tag
![Airflow Create Cluster Tag](img/pulsar_mysql/airflow/airflow_create_cluster_tag.png)

### Register Pulsar Cluster

![airflow_create_pulsar_cluster](img/pulsar_mysql/airflow/airflow_create_pulsar_cluster.png)
![Airflow Create Pulsar Cluster](img/pulsar_mysql/airflow/airflow_create_pulsar_cluster.png)

### Create Data Target

![airflow_create_data_target](img/pulsar_mysql/airflow/airflow_create_data_target.png)
![Airflow Create Data Target](img/pulsar_mysql/airflow/airflow_create_data_target.png)

Execute the following SQL statement:

Expand All @@ -39,71 +45,73 @@ CREATE TABLE sink_table (
);
```

## Airflow Initialization
## Airflow Initialize

### Get Initial DAG
### Get Original DAG

They can be obtained from [Inlong](https://github.com/apache/inlong).
They can be obtained from [InLong](https://github.com/apache/inlong).

![airflow_get_DAGs](img/pulsar_mysql/airflow/airflow_get_DAGs.jpg)
![Airflow Get DAGs](img/pulsar_mysql/airflow/airflow_get_DAGs.jpg)

> Airflow does not provide an API for DAG creation, so two original DAGs are required. `dag_creator` is used to create offline tasks, and `dag_cleaner` is used to clean up offline tasks regularly.
### Create Initial DAG
### Create Original DAG

Place the DAG file in the Airflow default DAG directory and wait for a while. The Airflow scheduler will scan the directory and load the DAG:
![airflow_original_DAG](img/pulsar_mysql/airflow/airflow_original_DAG.png)
![Airflow Original DAG](img/pulsar_mysql/airflow/airflow_original_DAG.png)

### Airflow REST API

By default, Airflow will reject all REST API requests. Please refer to the [Airflow official documentation](https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/api-authentication.html) for configuration.

### Inlong Manager Configuration
### Configure InLong Manager

Modify the configuration file according to the configuration file requirements and restart Inlong Manager.
Modify the configuration file according to the configuration file requirements and restart InLong Manager.
```properties
# Inlong Manager URL accessible by the scheduler
# InLong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=http://inlongManagerIp:inlongManagerPort
# Management URL for Airflow
schedule.engine.airflow.baseUrl=http://airflowIP:airflowPort
# Username and password for Airflow REST API authentication
schedule.engine.airflow.username=airflow
schedule.engine.airflow.password=airflow
# Connection used to save Inlong Manager authentication information
# Connection used to save InLong Manager authentication information
schedule.engine.airflow.connection.id=inlong_connection
# The ids of the two original DAGs
schedule.engine.airflow.cleaner.id=dag_cleaner
schedule.engine.airflow.creator.id=dag_creator
```

## Offline Synchronization Task Creation
## Task Creation
### Create Synchronization Task

![airflow_create_synchronization_task](img/pulsar_mysql/airflow/airflow_create_synchronization_task.png)
![Airflow Create Synchronization Task](img/pulsar_mysql/airflow/airflow_create_synchronization_task.png)

### Create Data Stream Group
![airflow_data_stream_group](img/pulsar_mysql/airflow/airflow_data_stream_group.png)
![Airflow Data Stream Group](img/pulsar_mysql/airflow/airflow_data_stream_group.png)

Please refer to the following steps: [Use Quartz built-in scheduling engine](./quartz_example.md)
Please refer to the following steps: [Quartz Scheduling Engine Example](./quartz_example.md)
### Create Airflow Offline Task

After approval and configuration, Inlong Manager will trigger `dag_creator` through the Airflow API to create the offline task DAG:
After approval and configuration, InLong Manager will trigger `dag_creator` through the Airflow API to create the offline task DAG:

![airflow_create_task_DAG.png](img/pulsar_mysql/airflow/airflow_create_task_DAG.png)
![Airflow Create Task DAG.png](img/pulsar_mysql/airflow/airflow_create_task_DAG.png)

![airflow_create_task_DAG_result.png](img/pulsar_mysql/airflow/airflow_create_task_DAG_result.png)
![Airflow Create Task DAG Result](img/pulsar_mysql/airflow/airflow_create_task_DAG_result.png)

> Offline task DAG may not be scheduled immediately, because Airflow will scan DAG files regularly and add them to the schedule, so it may take some time.
The offline task execution results are as follows:

![airflow_DAG_task_result.png](img/pulsar_mysql/airflow/airflow_DAG_task_result.png)
![Airflow DAG Task Result](img/pulsar_mysql/airflow/airflow_DAG_task_result.png)

> Airflow will periodically call the interface provided by Inlong Manager to submit Flink tasks according to the configuration in the `Create Data Stream Group` section. This is why the authentication information of Inlong Manager needs to be saved in the `Inlong Manager Configuration` section.
> Airflow will periodically call the interface provided by InLong Manager to submit Flink tasks according to the configuration in the `Create Data Stream Group` section. This is why the authentication information of InLong Manager needs to be saved in the `Configure InLong Manager` section.
## Test Data
### Send Data
### Sending Data

Use the Pulsar SDK to produce data into the Pulsar topic. An example is as follows:

The example of using Pulsar SDK to write production data to a Pulsar topic is as follows:
```java
// Create pulsar client and producer
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Expand All @@ -118,8 +126,8 @@ for (int i = 0; i < 10000; i++) {
}
```

### Data Verification
### Data Validation

Then enter Mysql and check the database table data. You can see that the data has been synchronized to MySQL.
Then enter MySQL to check the data in the table:

![airflow_synchronization_result](img/pulsar_mysql/airflow/airflow_synchronization_result.png)
![Airflow Synchronization Result](img/pulsar_mysql/airflow/airflow_synchronization_result.png)
49 changes: 42 additions & 7 deletions docs/quick_start/offline_data_sync/dolphinscheduler_example.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: Use DolphinScheduler third-party scheduling engine
title: DolphinScheduler Scheduling Engine Example
sidebar_position: 2
---

In the following content, we will introduce how to use DolphinScheduler, a third-party schedule engine in Apache InLong to create offline data synchronization.
In the following sections, we will walk through a complete example to demonstrate how to integrate the third-party scheduling engine DolphinScheduler into Apache InLong to create an offline data synchronization from Pulsar to MySQL.

## Deployment

Expand All @@ -20,10 +20,40 @@ Download the [connectors](https://inlong.apache.org/downloads/) corresponding to

> Currently, Apache InLong's offline data synchronization capability only supports Flink-1.18, so please download the 1.18 version of connectors.
### Operations on DolphinScheduler
## Create Clusters And Data Target
When all containers are successfully started, you can access the InLong dashboard address `http://localhost`, and use the following default account to log in.
```
User: admin
Password: inlong
```

### Create Cluster Tag
![DolphinScheduler Create Cluster Tag](img/pulsar_mysql/dolphinscheduler/ds_create_cluster_tag.png)

### Register Pulsar Cluster

![DolphinScheduler Create Pulsar Cluster](img/pulsar_mysql/dolphinscheduler/ds_create_pulsar_cluster.png)

### Create Data Target

![DolphinScheduler Create Data Target](img/pulsar_mysql/dolphinscheduler/ds_create_data_target.png)

Execute the following SQL statement:

```mysql
CREATE TABLE sink_table (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```

## DolphinScheduler Initialize
### Deploy DolphinScheduler

Before using DolphinScheduler as your scheduling engine, please make sure you have a working DolphinScheduler on hand. If you need to deploy a DolphinScheduler for yourself, please refer to the [DolphinScheduler Official Document](https://dolphinscheduler.apache.org/zh-cn).

### Get DolphinScheduler token
![DolphinScheduler Security](img/pulsar_mysql/dolphinscheduler/ds_security.png)

![DolphinScheduler Token Manager](img/pulsar_mysql/dolphinscheduler/ds_token_manager.png)
Expand All @@ -36,7 +66,7 @@ Set parameters for the token according to the steps in the figure, include [Expi

![DolphinScheduler Token Copy](img/pulsar_mysql/dolphinscheduler/ds_token_copy.png)

### Modify configuration in InLong Manager
### Configure InLong Manager

For third-party scheduling engine, we need to modify configurations in manager.

Expand All @@ -50,13 +80,19 @@ For DolphinScheduler engine there are following configurations need to be modifi

After doing this, restart the InLong Manager to ensure the configuration is enabled.

### Use DolphinScheduler in offline synchronization
## Task Creation
### Create Synchronization Task
![DolphinScheduler Create Synchronization Task](img/pulsar_mysql/dolphinscheduler/ds_create_synchronization_task.png)

### Create Data Stream Group

During configure the offline synchronization task, choose DolphinScheduler when selecting the scheduling engine, then configure other parameters.

![DolphinScheduler Task Configuration](img/pulsar_mysql/dolphinscheduler/ds_task_conf.png)

For details about how to manage clusters and configure data nodes, see [Use Quartz built-in scheduling engine](quartz_example.md).
For details about how to manage clusters and configure data nodes, see [Quartz Scheduling Engine Example](quartz_example.md).

### Create DolphinScheduler Offline Task

After approval data flow, return to the [Synchronization] page and wait for the task configuration to succeed. Once configured successfully, the DolphinScheduler will periodically calls back InLong to synchronize offline data and the Manager will periodically submit Flink Batch Jobs to the Flink cluster.

Expand All @@ -71,7 +107,6 @@ View the DolphinScheduler task instance logs. The following logs indicate that t
![DolphinScheduler Schedule Success](img/pulsar_mysql/dolphinscheduler/ds_schedule_success.png)

## Test Data

### Sending Data

Use the Pulsar SDK to produce data into the Pulsar topic. An example is as follows:
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions docs/quick_start/offline_data_sync/quartz_example.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: Use Quartz built-in scheduling engine
title: Quartz Scheduling Engine Example
sidebar_position: 1
---

In the following content, we will introduce how to use built-in schedule engine in Apache InLong to create offline data synchronization from Pulsar to MySQL through a complete example.
In the following sections, we will walk through a complete example to demonstrate how to use Apache InLong's built-in scheduling engine (Quartz) to create an offline data synchronization from Pulsar to MySQL.

## Deployment
### Install InLong
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
---
title: Airflow 离线同步示例
title: Airflow 调度引擎示例
sidebar_position: 3
---
在下面的内容中,将通过一个完整的示例介绍如何使用 Apache InLong 创建 Airflow 调度任务,并完成 Pulsar -> MySQL 的离线数据同步。

在下面的内容中,我们将通过一个完整的示例介绍如何在 Apache InLong 中集成第三方调度引擎 Airflow 来创建 Pulsar 到 MySQL 的离线数据同步。

## 环境部署
### 安装 InLong
Expand All @@ -13,21 +14,25 @@ sidebar_position: 3

### 添加 Connectors

下载与 Flink 版本对应的 [connectors](https://inlong.apache.org/zh-CN/downloads) ,解压后将 `sort-connector-jdbc-[version]-SNAPSHOT.jar` 放在 `/inlong-sort/connectors/` 目录下。
下载与 Flink 版本对应的 [connectors](https://inlong.apache.org/zh-CN/downloads),解压后将 `sort-connector-jdbc-[version]-SNAPSHOT.jar` 放在 `/inlong-sort/connectors/` 目录下。
> 当前 Apache InLong 的离线数据同步能力只支持 Flink-1.18 版本,所以请下载 1.18 版本的 connectors。
## 创建集群和数据目标

InLong 服务启动后,可以访问 InLong Dashboard 地址 `http://localhost`,并使用以下默认账号登录:
```
User: admin
Password: inlong
```
### 创建集群标签
![airflow_create_cluster_labels](img/pulsar_mysql/airflow/airflow_create_cluster_labels.png)
![Airflow Create Cluster Tag](img/pulsar_mysql/airflow/airflow_create_cluster_tag.png)

### 注册 Pulsar 集群

![airflow_create_pulsar_cluster](img/pulsar_mysql/airflow/airflow_create_pulsar_cluster.png)
![Airflow Create Pulsar Cluster](img/pulsar_mysql/airflow/airflow_create_pulsar_cluster.png)

### 创建数据目标

![airflow_create_data_target](img/pulsar_mysql/airflow/airflow_create_data_target.png)
![Airflow Create Data Target](img/pulsar_mysql/airflow/airflow_create_data_target.png)

执行如下 Sql 语句:

Expand All @@ -43,67 +48,66 @@ CREATE TABLE sink_table (

### 获取初始 DAG

`dag_creator``dag_cleaner` 可以在 [Inlong](https://github.com/apache/inlong) 获取。
`dag_creator``dag_cleaner` 可以在 [InLong](https://github.com/apache/inlong) 获取。

![airflow_get_DAGs](img/pulsar_mysql/airflow/airflow_get_DAGs.jpg)
![Airflow Get DAGs](img/pulsar_mysql/airflow/airflow_get_DAGs.jpg)

> Airflow 没有提供 DAG 创建的提供 API ,因此需要两个原始 DAG。`dag_creator` 用于创建离线任务,`dag_cleaner` 用于定时去清理离线任务。
### 创建初始 DAG

首先将 DAG 文件放到 Airflow 默认的 DAG 目录下面,等待一段时间,Airflow 调度器会去扫描该目录,并加载 DAG
首先将 DAG 文件放到 Airflow 默认的 DAG 目录下面,等待一段时间,Airflow 调度器会去扫描该目录,并加载 DAG:

![airflow_original_DAG](img/pulsar_mysql/airflow/airflow_original_DAG.png)
![Airflow Original DAGs](img/pulsar_mysql/airflow/airflow_original_DAG.png)

### Airflow REST API

默认情况下,Airflow 会拒绝所有 REST API 请求。请参考 [Airflow 官方文档](https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/api-authentication.html) 进行配置。

### Inlong Manager 配置
### 配置 InLong Manager

根据配置文件要求,对配置文件进行修改,并重启 Inlong Manager
根据配置文件要求,对配置文件进行修改,并重启 InLong Manager。

```properties
# Airflow 能够访问到的 Inlong Manager 的 url
# Airflow 能够访问到的 InLong Manager 的 url
schedule.engine.inlong.manager.url=http://inlongManagerIp:inlongManagerPort
# Airflow 管理页面的 URL
schedule.engine.airflow.baseUrl=http://airflowIP:airflowPort
# 用于 Airflow 的 REST API 认证的用户名和密码
schedule.engine.airflow.username=airflow
schedule.engine.airflow.password=airflow
# 用来保存 Inlong Manager 认证信息的 Connection
# 用来保存 InLong Manager 认证信息的 Connection
schedule.engine.airflow.connection.id=inlong_connection
# 两个原始 DAG 的 id
schedule.engine.airflow.cleaner.id=dag_cleaner
schedule.engine.airflow.creator.id=dag_creator
```

## 离线同步任务创建

## 任务创建
### 创建同步任务

![airflow_create_ynchronization_task](img/pulsar_mysql/airflow/airflow_create_ynchronization_task.png)
![Airflow Create Synchronization Task](img/pulsar_mysql/airflow/airflow_create_synchronization_task.png)

### 创建数据流组
![airflow_data_stream_group](img/pulsar_mysql/airflow/airflow_data_stream_group.png)
![Airflow Data Stream Group](img/pulsar_mysql/airflow/airflow_data_stream_group.png)

后续步骤请参照: [使用内置的 Quartz 调度引擎](./quartz_example.md)
后续步骤请参照: [Quartz 调度引擎示例](./quartz_example.md)

### 创建 Airflow 离线任务

审批并配置成功后,Inlong Manager 会去通过 Airflow API 触发 `dag_creator` 去创建离线任务 DAG
审批并配置成功后,InLong Manager 会去通过 Airflow API 触发 `dag_creator` 去创建离线任务 DAG:

![airflow_create_task_DAG.png](img/pulsar_mysql/airflow/airflow_create_task_DAG.png)
![Airflow Create Task DAG](img/pulsar_mysql/airflow/airflow_create_task_DAG.png)

![airflow_create_task_DAG_result.png](img/pulsar_mysql/airflow/airflow_create_task_DAG_result.png)
![Airflow Create Task DAG result](img/pulsar_mysql/airflow/airflow_create_task_DAG_result.png)

>离线任务 DAG 可能不会立即进行调度,因为 Airflow 会定期去扫描 DAG 文件,再将其加入调度中,所以可能需要等待一段时间。
离线任务执行结果如下:

![airflow_DAG_task_result.png](img/pulsar_mysql/airflow/airflow_DAG_task_result.png)
![Airflow DAG Task Result](img/pulsar_mysql/airflow/airflow_DAG_task_result.png)

> Airflow 会根据`创建数据流组`小节中的配置,定期去调用 Inlong Manager 所提供的接口进行 Flink 任务的提交,这里也是为什么在 `Inlong Manager 配置` 小节中需要保存 Inlong Manager 的认证信息。
> Airflow 会根据`创建数据流组`小节中的配置,定期去调用 InLong Manager 所提供的接口进行 Flink 任务的提交,这里也是为什么在 `InLong Manager 配置` 小节中需要保存 InLong Manager 的认证信息。
## 测试数据
### 发送数据
Expand All @@ -127,4 +131,4 @@ for (int i = 0; i < 10000; i++) {

然后进入 Mysql,查看库表数据,可以看到数据已经同步到 MySQL 中。

![airflow_synchronization_result](img/pulsar_mysql/airflow/airflow_synchronization_result.png)
![Airflow Synchronization Result](img/pulsar_mysql/airflow/airflow_synchronization_result.png)
Loading

0 comments on commit 48702b5

Please sign in to comment.