Code Ease Code Ease
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档

神秘的鱼仔

你会累是因为你在走上坡路
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档
服务器
  • Java核心基础

  • 框架的艺术

  • 分布式与微服务

    • SpringCloud

    • SpringCloudAlibaba

    • Dubbo

    • 认证授权

    • 任务调度

      • 为什么要引入分布式任务调度系统?
      • 为什么说datax是目前最好的异构数据源数据交换工具
      • 如何开发一个自己的datax插件
        • (一)概述
        • (二)开发一个HttpReader
        • (三)总结
  • 开发经验大全

  • 版本新特性

  • Java
  • 分布式与微服务
  • 任务调度
CodeEase
2024-01-02
目录

如何开发一个自己的datax插件

作者:鱼仔
博客首页: codeease.top (opens new window)
公众号:Java鱼仔

# (一)概述

DataX采用FrameWork+plugin的方式,插件只需关心数据的读取或者写入本身。而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。在写插件前官方建议先看一遍开发文档。

Datax开发文档:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md

本文将带大家一起开发一个读 Http 接口的 Reader。

# (二)开发一个HttpReader

1、从git上将项目clone下来 https://github.com/alibaba/DataX

2、新建一个module,选最基本的maven项目,项目名定义为xxxreader或者xxxwriter

3、从任意模块里将pom.xml中的以及中的内容复制,删除无用的依赖,引入新模块需要的依赖

4、将任何一个相同模块(reader或者writer)里的resources中的plugin.json和plugin_job_template.json复制到resources目录下

5、修改plugin.json,主要修改name和class

6、修改plugin_job_template.json,这是datax脚本的结构

7、将任何一个相同模块(reader或者writer)里的assembly复制过来,修改下述内容

8、开始编码

编码之前先看一遍DataX开发文档,再结合已有的插件开发就没什么大问题。下面这段HttpReader实现读取接口中的数据,并作为后续Writer的输入,核心代码如下:

public class HttpReader extends Reader {
    public static class Job extends Reader.Job{
        private static final Logger LOG = LoggerFactory
                .getLogger(Job.class);

        Configuration pluginJobConf=null;
        @Override
        public void init() {
            this.pluginJobConf = super.getPluginJobConf();
            LOG.info("pluginJobConfig",pluginJobConf);
        }
        @Override
        public List<Configuration> split(int adviceNumber) {
            LOG.info("adviceNumber:{}",adviceNumber);
            List<Configuration> configurations = new ArrayList<Configuration>();
            for (int i = 0; i < adviceNumber; i++) {
                configurations.add(this.pluginJobConf.clone());
            }
            return configurations;
        }
        @Override
        public void destroy() {
        }
    }

    public static class Task extends Reader.Task{
        private static final Logger LOG = LoggerFactory.getLogger(Task.class);
        private Configuration configuration;
        private String httpUrl=null;
        private String httpType=null;
        private String httpParamJson=null;
        private List<String> dataColumn=null;

        @Override
        public void init() {
            this.configuration = super.getPluginJobConf();
            this.httpUrl = this.configuration.getString("httpUrl");
            this.httpType = this.configuration.getString("httpType");
            this.httpParamJson = this.configuration.getString("httpParamJson");
            this.dataColumn = this.configuration.getList("dataColumn",String.class);
        }

        @Override
        public void startRead(RecordSender recordSender) {
            LOG.info("------------------- http reader log -----------------------");
            LOG.info(this.httpUrl);
            LOG.info(this.httpType);
            LOG.info(this.httpParamJson);
            //调用获取
            String result=null;
            try {
                if (Key.GET.equalsIgnoreCase(this.httpType)){
                    //get请求调用
                    result = HttpUtil.get(this.httpUrl);
                }else if (Key.POST.equalsIgnoreCase(this.httpType)){
                    //post请求调用
                    result = HttpUtil.post(this.httpUrl,this.httpParamJson,60);
                }else{
                    throw DataXException.asDataXException(HttpReaderErrorCode.NOT_SUPPORTED_ERROR,"该方法暂时不支持");
                }
            }catch (Exception e){
               throw DataXException.asDataXException(HttpReaderErrorCode.REQUEST_CALL_FAILED,"请求调用失败");
            }
            Record record=null;
            //将结果塞到record中
            JSONObject jsonResult = JSON.parseObject(result);
            JSONArray array = jsonResult.getJSONArray("data");
            for (int i = 0; i < array.size(); i++) {
                JSONObject jsonObject = array.getJSONObject(i);
                record=recordSender.createRecord();
                for (String data:this.dataColumn){
                    record.addColumn(new StringColumn(jsonObject.getString(data)));
                }
                recordSender.sendToWriter(record);
            }

            recordSender.flush();
        }
        @Override
        public void destroy() {

        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

9、编码完成后,通过maven打包

根目录的pom.xml的module去掉其他的reader和writer,留下common、core、transformer和刚刚写的httpReader

执行命令:

mvn -U clean package assembly:assembly -Dmaven.test.skip=true
1

10、本地启动

对core文件夹下的Engine类配置启动参数即可直接在本地调用DataX插件

-Ddatax.home=/Users/ly/IdeaProjects/DataX-master/target/datax/datax

-job /Users/ly/IdeaProjects/DataX-master/http2stream.json -jobid 1

/Users/ly/IdeaProjects/DataX-master
1
2
3
4
5

11、配置脚本

配置一个从httpReader读,控制台输出的脚本,读取接口中key为a1、a2、a3的数据

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "httpreader",
          "parameter": {
            "httpUrl": "http://127.0.0.1:8081/test",
            "httpType": "POST",
            "httpParamJson":"{\"aa\":\"aa\"}",
            "dataColumn": [
              "a1","a2","a3"
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

在本地起一个简单的测试接口:

@PostMapping("test")
public String test() {
    String jsonResult = "{\n" +
            "    \"data\":[\n" +
            "        {\n" +
            "            \"a1\":\"a11\"\n" +
            "        },\n" +
            "        {\n" +
            "            \"a2\":\"a22\"\n" +
            "        },\n" +
            "        {\n" +
            "            \"a3\":\"a333\"\n" +
            "        }\n" +
            "    ]\n" +
            "}";
    return jsonResult;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

调用成功获取到接口中的数据:

# (三)总结

至此,一个简单的 Datax 插件就开发完成了。可以看出 DataX 的扩展性还是很强的,官网上的这些插件自己基本上都能写出来,值得大家去学习一下。

上次更新: 2025/02/18, 11:30:08
为什么说datax是目前最好的异构数据源数据交换工具
如何用Java写一个规范的http接口?

← 为什么说datax是目前最好的异构数据源数据交换工具 如何用Java写一个规范的http接口?→

最近更新
01
AI大模型部署指南
02-18
02
半个月了,DeepSeek为什么还是服务不可用
02-13
03
Python3.9及3.10安装文档
01-23
更多文章>
Theme by Vdoing | Copyright © 2023-2025 备案图标 浙公网安备33021202002405 | 浙ICP备2023040452号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式