Skip to content

Commit

Permalink
[INLONG-1085][Doc] Add documents for Airflow offline scheduler (#1089)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zkplo authored Dec 5, 2024
1 parent fb20d83 commit 8d250a6
Show file tree
Hide file tree
Showing 72 changed files with 318 additions and 47 deletions.
125 changes: 125 additions & 0 deletions docs/quick_start/offline_data_sync/airflow_pulsar_mysql_example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
---
title: Example of Airflow Offline Synchronization
sidebar_position: 3
---
In the following content, a complete example will be used to introduce how to create Airflow scheduling tasks using Apache InLong and complete offline data synchronization from Pulsar to MySQL.

## Deployment
### Install InLong

Before we begin, we need to install InLong. Here we provide two ways:
- [Docker Deployment](deployment/docker.md) (Recommended)
- [Bare Metal Deployment](deployment/bare_metal.md)

### Add Connectors

Download the [connectors](https://inlong.apache.org/downloads/) corresponding to Flink version, and after decompression, place `sort-connector-jdbc-[version]-SNAPSHOT.jar` in `/inlong-sort/connectors/` directory.
> Currently, Apache InLong's offline data synchronization capability only supports Flink-1.18, so please download the 1.18 version of connectors.
## Create Clusters And Data Target

### Create Cluster Label
![airflow_create_cluster_labels](img/pulsar_mysql/airflow/airflow_create_cluster_labels.png)

### Register Pulsar Cluster

![airflow_create_pulsar_cluster](img/pulsar_mysql/airflow/airflow_create_pulsar_cluster.png)

### Create Data Target

![airflow_create_data_target](img/pulsar_mysql/airflow/airflow_create_data_target.png)

Execute the following SQL statement:

```mysql
CREATE TABLE sink_table (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```

## Airflow Initialization

### Get Initial DAG

They can be obtained from [Inlong](https://github.com/apache/inlong).

![airflow_get_DAGs](img/pulsar_mysql/airflow/airflow_get_DAGs.jpg)

> Airflow does not provide an API for DAG creation, so two original DAGs are required. `dag_creator` is used to create offline tasks, and `dag_cleaner` is used to clean up offline tasks regularly.
### Create Initial DAG

Place the DAG file in the Airflow default DAG directory and wait for a while. The Airflow scheduler will scan the directory and load the DAG:
![airflow_original_DAG](img/pulsar_mysql/airflow/airflow_original_DAG.png)

### Airflow REST API

By default, Airflow will reject all REST API requests. Please refer to the [Airflow official documentation](https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/api-authentication.html) for configuration.

### Inlong Manager Configuration

Modify the configuration file according to the configuration file requirements and restart Inlong Manager.
```properties
# Inlong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=http://inlongManagerIp:inlongManagerPort
# Management URL for Airflow
schedule.engine.airflow.baseUrl=http://airflowIP:airflowPort
# Username and password for Airflow REST API authentication
schedule.engine.airflow.username=airflow
schedule.engine.airflow.password=airflow
# Connection used to save Inlong Manager authentication information
schedule.engine.airflow.connection.id=inlong_connection
# The ids of the two original DAGs
schedule.engine.airflow.cleaner.id=dag_cleaner
schedule.engine.airflow.creator.id=dag_creator
```

## Offline Synchronization Task Creation
### Create Synchronization Task

![airflow_create_synchronization_task](img/pulsar_mysql/airflow/airflow_create_synchronization_task.png)
### Create Data Stream Group
![airflow_data_stream_group](img/pulsar_mysql/airflow/airflow_data_stream_group.png)

Please refer to the following steps: [Use Quartz built-in scheduling engine](./quartz_example.md)
### Create Airflow Offline Task

After approval and configuration, Inlong Manager will trigger `dag_creator` through the Airflow API to create the offline task DAG:

![airflow_create_task_DAG.png](img/pulsar_mysql/airflow/airflow_create_task_DAG.png)

![airflow_create_task_DAG_result.png](img/pulsar_mysql/airflow/airflow_create_task_DAG_result.png)

> Offline task DAG may not be scheduled immediately, because Airflow will scan DAG files regularly and add them to the schedule, so it may take some time.
The offline task execution results are as follows:

![airflow_DAG_task_result.png](img/pulsar_mysql/airflow/airflow_DAG_task_result.png)

> Airflow will periodically call the interface provided by Inlong Manager to submit Flink tasks according to the configuration in the `Create Data Stream Group` section. This is why the authentication information of Inlong Manager needs to be saved in the `Inlong Manager Configuration` section.
## Test Data
### Send Data

The example of using Pulsar SDK to write production data to a Pulsar topic is as follows:
```java
// Create pulsar client and producer
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("public/default/test").create();

// Send a message
for (int i = 0; i < 10000; i++) {
// The field separator is |
String msgStr = i + "|msg-" + i;
MessageId msgId = producer.send(msgStr.getBytes(StandardCharsets.UTF_8));
System.out.println("Send msg : " + msgStr + " with msgId: " + msgId);
}
```

### Data Verification

Then enter Mysql and check the database table data. You can see that the data has been synchronized to MySQL.

![airflow_synchronization_result](img/pulsar_mysql/airflow/airflow_synchronization_result.png)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
---
title: Airflow 离线同步示例
sidebar_position: 3
---
在下面的内容中,将通过一个完整的示例介绍如何使用 Apache InLong 创建 Airflow 调度任务,并完成 Pulsar -> MySQL 的离线数据同步。

## 环境部署
### 安装 InLong

在开始之前,我们需要安装 InLong 的全部组件,这里提供两种方式:
- [Docker 部署](deployment/docker.md)(推荐)
- [Bare Metal 部署](deployment/bare_metal.md)

### 添加 Connectors

下载与 Flink 版本对应的 [connectors](https://inlong.apache.org/zh-CN/downloads) ,解压后将 `sort-connector-jdbc-[version]-SNAPSHOT.jar` 放在 `/inlong-sort/connectors/` 目录下。
> 当前 Apache InLong 的离线数据同步能力只支持 Flink-1.18 版本,所以请下载 1.18 版本的 connectors。
## 创建集群和数据目标

### 创建集群标签
![airflow_create_cluster_labels](img/pulsar_mysql/airflow/airflow_create_cluster_labels.png)

### 注册 Pulsar 集群

![airflow_create_pulsar_cluster](img/pulsar_mysql/airflow/airflow_create_pulsar_cluster.png)

### 创建数据目标

![airflow_create_data_target](img/pulsar_mysql/airflow/airflow_create_data_target.png)

执行如下 Sql 语句:

```mysql
CREATE TABLE sink_table (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```

## Airflow 初始化

### 获取初始 DAG

`dag_creator``dag_cleaner` 可以在 [Inlong](https://github.com/apache/inlong) 获取。

![airflow_get_DAGs](img/pulsar_mysql/airflow/airflow_get_DAGs.jpg)

> Airflow 没有提供 DAG 创建的提供 API ,因此需要两个原始 DAG。`dag_creator` 用于创建离线任务,`dag_cleaner` 用于定时去清理离线任务。
### 创建初始 DAG

首先将 DAG 文件放到 Airflow 默认的 DAG 目录下面,等待一段时间,Airflow 调度器会去扫描该目录,并加载 DAG :

![airflow_original_DAG](img/pulsar_mysql/airflow/airflow_original_DAG.png)

### Airflow REST API

默认情况下,Airflow 会拒绝所有 REST API 请求。请参考 [Airflow 官方文档](https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/api-authentication.html) 进行配置。

### Inlong Manager 配置

根据配置文件要求,对配置文件进行修改,并重启 Inlong Manager 。

```properties
# Airflow 能够访问到的 Inlong Manager 的 url
schedule.engine.inlong.manager.url=http://inlongManagerIp:inlongManagerPort
# Airflow 管理页面的 URL
schedule.engine.airflow.baseUrl=http://airflowIP:airflowPort
# 用于 Airflow 的 REST API 认证的用户名和密码
schedule.engine.airflow.username=airflow
schedule.engine.airflow.password=airflow
# 用来保存 Inlong Manager 认证信息的 Connection
schedule.engine.airflow.connection.id=inlong_connection
# 两个原始 DAG 的 id
schedule.engine.airflow.cleaner.id=dag_cleaner
schedule.engine.airflow.creator.id=dag_creator
```

## 离线同步任务创建

### 创建同步任务

![airflow_create_ynchronization_task](img/pulsar_mysql/airflow/airflow_create_ynchronization_task.png)

### 创建数据流组
![airflow_data_stream_group](img/pulsar_mysql/airflow/airflow_data_stream_group.png)

后续步骤请参照: [使用内置的 Quartz 调度引擎](./quartz_example.md)

### 创建 Airflow 离线任务

审批并配置成功后,Inlong Manager 会去通过 Airflow API 触发 `dag_creator` 去创建离线任务 DAG :

![airflow_create_task_DAG.png](img/pulsar_mysql/airflow/airflow_create_task_DAG.png)

![airflow_create_task_DAG_result.png](img/pulsar_mysql/airflow/airflow_create_task_DAG_result.png)

>离线任务 DAG 可能不会立即进行调度,因为 Airflow 会定期去扫描 DAG 文件,再将其加入调度中,所以可能需要等待一段时间。
离线任务执行结果如下:

![airflow_DAG_task_result.png](img/pulsar_mysql/airflow/airflow_DAG_task_result.png)

> Airflow 会根据`创建数据流组`小节中的配置,定期去调用 Inlong Manager 所提供的接口进行 Flink 任务的提交,这里也是为什么在 `Inlong Manager 配置` 小节中需要保存 Inlong Manager 的认证信息。
## 测试数据
### 发送数据

通过 Pulsar SDK 生产数据写入的 Pulsar topic 中,示例如下:
```java
// 创建 pulsar client 和 producer
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("public/default/test").create();

// 发送消息
for (int i = 0; i < 10000; i++) {
// 字段分隔符为 |
String msgStr = i + "|msg-" + i;
MessageId msgId = producer.send(msgStr.getBytes(StandardCharsets.UTF_8));
System.out.println("Send msg : " + msgStr + " with msgId: " + msgId);
}
```

### 数据验证

然后进入 Mysql,查看库表数据,可以看到数据已经同步到 MySQL 中。

![airflow_synchronization_result](img/pulsar_mysql/airflow/airflow_synchronization_result.png)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: Pulsar 到 MySQL 示例
title: 使用内置的 Quartz 调度引擎
sidebar_position: 1
---

在下面的内容中,我们将通过一个完整的示例介绍如何使用 Apache InLong 创建 Pulsar -> MySQL 的离线数据同步。
在下面的内容中,我们将通过一个完整的示例介绍如何使用 Apache InLong 内置的调度引擎创建 Pulsar -> MySQL 的离线数据同步。

## 环境部署
### 安装 InLong
Expand All @@ -18,52 +18,60 @@ sidebar_position: 1
> 当前 Apache InLong 的离线数据同步能力只支持 Flink-1.18 版本,所以请下载 1.18 版本的 connectors。
## 集群初始化
InLong 服务启动后,可以访问 InLong Dashboard 地址 http://localhost,并使用以下默认账号登录:
InLong 服务启动后,可以访问 InLong Dashboard 地址 `http://localhost`,并使用以下默认账号登录:
```
User: admin
Password: inlong
```
### 创建集群标签
页面点击 【集群管理】→【标签管理】→【新建】
页面点击 【集群管理】→【标签管理】→【新建】

![Create Cluster Tag](img/pulsar_mysql/cluster_tag.png)
![Create Cluster Tag](img/pulsar_mysql/quartz/cluster_tag.png)

**注意:default_cluster 是各个组件默认的集群标签,如果使用其它名称,确认对应标签配置已修改。**

### 注册 Pulsar 集群

![Create Pulsar](img/pulsar_mysql/pulsar.png)
![Create Pulsar](img/pulsar_mysql/quartz/pulsar.png)

**可以参考截图信息填写,包括集群名称、所属标签、Pulsar 集群地址等。**

## 任务创建
### 新建数据流组
页面点击【数据同步】→【新建数据同步】,填写 数据流组 ID,注意同步类型勾选为“离线”。

![Create Offline Group](img/pulsar_mysql/create_offline_group.png)
![Create Offline Group](img/pulsar_mysql/quartz/create_offline_group.png)

### 配置调度规则
在同步类型勾选为“离线”之后,就可以配置离线任务的调度规则,目前支持两种:常规和 crontab
在同步类型勾选为“离线”之后,就可以配置离线任务的调度规则,调度规则主要由两个部分组成,分别为【调度引擎】和【调度类型】

常规调度配置需要设置以下参数:
- 调度单位:支持分钟、小时、天、月、年以及单次,单次表示只执行一次
- 调度周期:表示两次任务调度之间的时间间隔
- 延迟时间:表示任务启动的延迟时间
- 有效时间:包括起始时间和结束时间,调度任务只会在这个时间范围内执行
#### 调度引擎
Apache InLong 提供了多种调度引擎供用户选择,Quartz 是 Apache InLong 内置的调度引擎,这里使用 Quartz 来处理任务。

![Create Offline Group](img/pulsar_mysql/normal_schedule.png)
![Schedule Engine Type](img/pulsar_mysql/quartz/schedule_engine_type.png)

crontab调度需要设置以下参数:
- 有效时间:包括起始时间和结束时间,调度任务只会在这个时间范围内执行
- crontab 表达式:表示任务的周期,比如 0 */5 * * * ?
#### 调度类型
Apache InLong 目前支持两种调度类型:常规和 crontab。

![Crontab Schedule](img/pulsar_mysql/cron_schedule.png)
常规调度类型配置需要设置以下参数:
- 调度单位:支持分钟、小时、天、月、年以及单次,单次表示只执行一次。
- 调度周期:表示两次任务调度之间的时间间隔。
- 延迟时间:表示任务启动的延迟时间。
- 有效时间:包括起始时间和结束时间,调度任务只会在这个时间范围内执行。

![Create Offline Group](img/pulsar_mysql/quartz/normal_schedule.png)

crontab 调度类型需要设置以下参数:
- 有效时间:包括起始时间和结束时间,调度任务只会在这个时间范围内执行。
- crontab 表达式:表示任务的周期,比如 `0 */5 * * * ?`

![Crontab Schedule](img/pulsar_mysql/quartz/cron_schedule.png)

### 新建数据源

数据来源中 点击 【新建】→【Pulsar】,配置数据源名称、Pulsar tenant、namespace、topic、admin url、service url、数据格式等参数。

![Create Source](img/pulsar_mysql/source.png)
![Create Source](img/pulsar_mysql/quartz/source.png)

注:Pulsar 的 topic 需要预先在 Pulsar 集群创建(或者在 Pulsar 集群开启自动创建 topic 功能)。

Expand All @@ -80,25 +88,25 @@ CREATE TABLE sink_table (

数据目标中 点击 【新建】→【MySQL】,配置数据目标名称、库名和表名(test.sink_table)等信息。

![Create Sink](img/pulsar_mysql/sink.png)
![Create Sink](img/pulsar_mysql/quartz/sink.png)

### 配置字段信息

分别在 【源字段】 和 【目标字段】中配置 Schema 映射信息,完成后点击 【提交审批】
分别在 【源字段】 和 【目标字段】中配置 Schema 映射信息,完成后点击 【提交审批】

![Create Source Fields](img/pulsar_mysql/source_field.png)
![Create Source Fields](img/pulsar_mysql/quartz/source_field.png)

![Create Sink Fields](img/pulsar_mysql/sink_field.png)
![Create Sink Fields](img/pulsar_mysql/quartz/sink_field.png)

### 审批数据流

页面点击【审批管理】->【我的审批】->【详情】->【通过】
页面点击【审批管理】->【我的审批】->【详情】->【通过】

![Approve](img/pulsar_mysql/approve.png)
![Approve](img/pulsar_mysql/quartz/approve.png)

返回 【数据同步】页面,等待任务配置成功,配置成功后,Manager 会周期提交 Flink Batch Job 到 Flink 集群。

![Flink Batch Job](img/pulsar_mysql/flink_batch_job.png)
![Flink Batch Job](img/pulsar_mysql/quartz/flink_batch_job.png)

## 测试数据
### 发送数据
Expand All @@ -122,4 +130,4 @@ CREATE TABLE sink_table (

然后进入 Mysql,查看库表数据,可以看到数据已经同步到 MySQL 中。

![Mysql Sink](img/pulsar_mysql/mysql_sink.png)
![Mysql Sink](img/pulsar_mysql/quartz/mysql_sink.png)
Loading

0 comments on commit 8d250a6

Please sign in to comment.