DataX使用总结_datax where-程序员宅基地

技术标签: DataX  MySQL  

简介

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 + 插件 的模式,目前已开源,代码托管在github,地址:https://github.com/alibaba/DataX。

 

DataX安装部署


1.下载压缩包:

下载页面地址:https://github.com/alibaba/DataX 在页面中【Quick Start】--->【Download DataX下载地址】进行下载。下载后的包名:datax.tar.gz。解压后{datax}目录下有{bin conf job lib log log_perf plugin script tmp}几个目录。
2.安装

将下载后的压缩包直接解压后可用,前提是对应的java及python环境满足要求。
    JDK(1.6以上,推荐1.6)
    Python(推荐Python2.6.X)一定要为python2,因为后面执行datax.py的时候,里面的python的print会执行不了,导致运行不成功,会提示你print语法要加括号,python2中加不加都行 python3中必须要加,否则报语法错

另外要注意, 不要解压到C:\Program Files目录下或其他名字带空格的目录下,因为在cmd执行时会因为路径有空格导致找不到程序主文件。

DataX支持绝大部分种类数据库的数据转移,其数据转移的主要流程有三步:Reader -->transform-->writer

reader 从数据库读取需要转移的数据,transform在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,这些工作在这里进行,writer将读取并处理后的数据写入目标数据库。

DataX配置文件

DataX是通过读取配置文件进行数据转移,配置文件为json格式,这里以MySQL做示范,MySQL 2 MySQL

{
    "job": {
        
        "content": [
                    //reader过程配置信息
            {
                "reader": {   
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",//数据库用户名
                        "password": "root",
                        "column": [
                            "id",        //要读取的列元素 list
                            "name"
                        ],
                         "where":"",//可以添加筛选条件
                        "splitPk": "db_id",//数据分片
                        "connection": [
                            {
                                "table": [
                                    "table"//要读取的表名list,支持多个表的读取
                                ],
                                "querySql":[     //自定义筛选SQL
                                    "select reflect.id as id from user ,reflect where user.id = reflect.user_id",
                                ]
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database" //要读取的数据库URL 可以加数据库配置信息后缀
                                ]
                            }
                        ]
                    }
                },

                "writer": {  //writer过程配置信息
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                        "id",    //要写入的列list 注意要与读取的列一致
                        "name"
                        ],
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://120.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "root", 
                        "username": "root"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
            }
        ]
    }
}
  • 数据分片:如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。官方文档推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点,要注意目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,MysqlReader将报错!

  • Where:添加where配置可以对要转移的数据进行筛选,比如可以选择只转移今天的数据,"where":"gmt_create > $bizdate "

MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据读取

querySql:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

类型转换

DataX 内部类型 Mysql 数据类型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext, year
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

贴上我的一个测试数据转移配置

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                      
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/pshare?characterEncoding=utf-8"], 
                              
                                "querySql":[
                                    "select reflect.id as id,reflect.user_id as user_id,user.mail as comment from user ,reflect where user.id = reflect.user_id",
                                ]
                            }
                        ], 
                        "password": "xxxx", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "writeMode":"insert ",//必选 控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句 insert/replace/update  默认insert
                        "column": [
                        "id",
                        "user_id",
                        "comment"
                        ],
                        "preSql":"",   //可选 插入数据前执行的SQL
                        "postSql":"",  //可选 插入数据成功后执行的SQL
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://xxx.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

用到了连表查询信息然后作为一个字段值转移到目标数据库,这里要注意查询结果的列名和顺序要和writer里column里一样。

writeMode

  • 描述:控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句

  • 必选:是

  • 所有选项:insert/replace/update

  • 默认值:insert

启动命令

CMD下 进到你解压的dataX/bin目录下

python  datax.py  ..\job\mysql2mysql.json 

Transformer

下面说一下transform的使用,transform用于对读取的数据进行特殊定制化的需求场景,包括裁剪列、转换列等工作,主要使用的是五个对数据进行处理的方法,分别是dx_substr(),dx_pad(),dx_replace(),dx_filter(),dx_groovy(),官方文档对这几个方法的解释如下: 

dx_substr

参数:3个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:目标字段长度。

返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)

举例:

dx_substr(1,"2","5")  column 1的value为“dataxTest”=>"taxTe"

dx_substr(1,"5","10")  column 1的value为“dataxTest”=>"Test"

dx_pad

参数:4个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:"l","r", 指示是在头进行pad,还是尾进行pad。
    • 第三个参数:目标字段长度。
    • 第四个参数:需要pad的字符。

返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符

举例:

         dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz

         dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz

dx_replace

参数:4个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:需要替换的字段长度。
    • 第四个参数:需要替换的字符串。

返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)

举例:

dx_replace(1,"2","4","****")  column 1的value为“dataxTest”=>"da****est"

dx_replace(1,"5","10","****")  column 1的value为“dataxTest”=>"data****"

dx_filter (关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。)

参数:

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <=
    • 第三个参数:正则表达式(java正则表达式)、值。

返回:

    • 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果.
    • like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。
    • , =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值,其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。
    • 如果目标colunn为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。

举例:

dx_filter(1,"like","dataTest") 

dx_filter(1,">=","10") 

dx_groovy

参数。

    • 第一个参数: groovy code
    • 第二个参数(列表或者为空):extraPackage

备注:

    • dx_groovy只能调用一次。不能多次调用。
    • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
    • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
    • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):

 transform job示例:

"transformer": [
                    {
                        "name": "dx_substr",
                        "parameter": 
                            {
                            "columnIndex":5,
                            "paras":["1","3"]
                            }  
                    },
                    {
                        "name": "dx_replace",
                        "parameter": 
                            {
                            "columnIndex":4,
                            "paras":["3","4","****"]
                            }  
                    },
                    {
                        "name": "dx_groovy",
                          "parameter": 
                            {
                               "code": "//groovy code//",  
                               "extraPackage":[
                               "import somePackage1;", 
                               "import somePackage2;"
                               ]                      
                            }  
                    }
                ]

要注意  Reader  Writer 和Transform 的配置都要写在content下面,Reader和writer的配置是用大括号,transform是写在[]里,因为transform可以包括多个数据处理,下面贴上一个包括reader writer transform Job ,

{
    "job": {

        "setting": {
            "speed": {
                "channel": "1"
            },
            "errorLimit": {
                "record": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                      "column": [
                        "id",
                        "user_id",
                        "comment",
                        "tel"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/pshare?characterEncoding=utf-8"], 
                              
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "writeMode":"insert ",
                        "column": [
                        "id",
                        "user_id",
                        "comment",
                        "tel"
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://xxx.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                },
                "transformer":[
                    {
                        "name":"dx_substr",
                        "parameter": 
                            {
                            "columnIndex":3,
                            "paras":["1","3"]
                            }  
                    },
                     {
                        "name": "dx_replace",
                        "parameter": 
                            {
                            "columnIndex":2,
                            "paras":["1","4","****"]
                            }  
                    }
                ]
            }
        ]

    }
}

打开CMD 执行命令

结果:

可以看到读取到两条符合筛选条件的数据并全部写入,transform两条数据全部成功,然后看一下数据库:

被读取数据库数据:

写入目标数据库结果:

可以看到第三条数据经过了dx_replace()替换,第四条经过了dx_substr()裁剪,数据转移成功。

PS:CMD中文乱码问题  使用HCP 65001即可

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_42050545/article/details/93883887

智能推荐

18个顶级人工智能平台-程序员宅基地

文章浏览阅读1w次,点赞2次,收藏27次。来源:机器人小妹  很多时候企业拥有重复,乏味且困难的工作流程,这些流程往往会减慢生产速度并增加运营成本。为了降低生产成本,企业别无选择,只能自动化某些功能以降低生产成本。  通过数字化..._人工智能平台

electron热加载_electron-reloader-程序员宅基地

文章浏览阅读2.2k次。热加载能够在每次保存修改的代码后自动刷新 electron 应用界面,而不必每次去手动操作重新运行,这极大的提升了开发效率。安装 electron 热加载插件热加载虽然很方便,但是不是每个 electron 项目必须的,所以想要舒服的开发 electron 就只能给 electron 项目单独的安装热加载插件[electron-reloader]:// 在项目的根目录下安装 electron-reloader,国内建议使用 cnpm 代替 npmnpm install electron-relo._electron-reloader

android 11.0 去掉recovery模式UI页面的选项_android recovery 删除 部分菜单-程序员宅基地

文章浏览阅读942次。在11.0 进行定制化开发,会根据需要去掉recovery模式的一些选项 就是在device.cpp去掉一些选项就可以了。_android recovery 删除 部分菜单

mnn linux编译_mnn 编译linux-程序员宅基地

文章浏览阅读3.7k次。https://www.yuque.com/mnn/cn/cvrt_linux_mac基础依赖这些依赖是无关编译选项的基础编译依赖• cmake(3.10 以上)• protobuf (3.0 以上)• 指protobuf库以及protobuf编译器。版本号使用 protoc --version 打印出来。• 在某些Linux发行版上这两个包是分开发布的,需要手动安装• Ubuntu需要分别安装 libprotobuf-dev 以及 protobuf-compiler 两个包•..._mnn 编译linux

利用CSS3制作淡入淡出动画效果_css3入场效果淡入淡出-程序员宅基地

文章浏览阅读1.8k次。CSS3新增动画属性“@-webkit-keyframes”,从字面就可以看出其含义——关键帧,这与Flash中的含义一致。利用CSS3制作动画效果其原理与Flash一样,我们需要定义关键帧处的状态效果,由CSS3来驱动产生动画效果。下面讲解一下如何利用CSS3制作淡入淡出的动画效果。具体实例可参考刚进入本站时的淡入效果。1. 定义动画,名称为fadeIn@-webkit-keyf_css3入场效果淡入淡出

计算机软件又必须包括什么,计算机系统应包括硬件和软件两个子系统,硬件和软件又必须依次分别包括______?...-程序员宅基地

文章浏览阅读2.8k次。计算机系统应包括硬件和软件两个子系统,硬件和软件又必须依次分别包括中央处理器和系统软件。按人的要求接收和存储信息,自动进行数据处理和计算,并输出结果信息的机器系统。计算机是脑力的延伸和扩充,是近代科学的重大成就之一。计算机系统由硬件(子)系统和软件(子)系统组成。前者是借助电、磁、光、机械等原理构成的各种物理部件的有机组合,是系统赖以工作的实体。后者是各种程序和文件,用于指挥全系统按指定的要求进行..._计算机系统包括硬件系统和软件系统 软件又必须包括

随便推点

进程调度(一)——FIFO算法_进程调度fifo算法代码-程序员宅基地

文章浏览阅读7.9k次,点赞3次,收藏22次。一 定义这是最早出现的置换算法。该算法总是淘汰最先进入内存的页面,即选择在内存中驻留时间最久的页面予以淘汰。该算法实现简单,只需把一个进程已调入内存的页面,按先后次序链接成一个队列,并设置一个指针,称为替换指针,使它总是指向最老的页面。但该算法与进程实际运行的规律不相适应,因为在进程中,有些页面经常被访问,比如,含有全局变量、常用函数、例程等的页面,FIFO 算法并不能保证这些页面不被淘汰。这里,我_进程调度fifo算法代码

mysql rownum写法_mysql应用之类似oracle rownum写法-程序员宅基地

文章浏览阅读133次。rownum是oracle才有的写法,rownum在oracle中可以用于取第一条数据,或者批量写数据时限定批量写的数量等mysql取第一条数据写法SELECT * FROM t order by id LIMIT 1;oracle取第一条数据写法SELECT * FROM t where rownum =1 order by id;ok,上面是mysql和oracle取第一条数据的写法对比,不过..._mysql 替换@rownum的写法

eclipse安装教程_ecjelm-程序员宅基地

文章浏览阅读790次,点赞3次,收藏4次。官网下载下载链接:http://www.eclipse.org/downloads/点击Download下载完成后双击运行我选择第2个,看自己需要(我选择企业级应用,如果只是单纯学习java选第一个就行)进入下一步后选择jre和安装路径修改jvm/jre的时候也可以选择本地的(点后面的文件夹进去),但是我们没有11版本的,所以还是用他的吧选择接受安装中安装过程中如果有其他界面弹出就点accept就行..._ecjelm

Linux常用网络命令_ifconfig 删除vlan-程序员宅基地

文章浏览阅读245次。原文链接:https://linux.cn/article-7801-1.htmlifconfigping &lt;IP地址&gt;:发送ICMP echo消息到某个主机traceroute &lt;IP地址&gt;:用于跟踪IP包的路由路由:netstat -r: 打印路由表route add :添加静态路由路径routed:控制动态路由的BSD守护程序。运行RIP路由协议gat..._ifconfig 删除vlan

redux_redux redis-程序员宅基地

文章浏览阅读224次。reduxredux里要求把数据都放在公共的存储区域叫store里面,组件中尽量少放数据,假如绿色的组件要给很多灰色的组件传值,绿色的组件只需要改变store里面对应的数据就行了,接着灰色的组件会自动感知到store里的数据发生了改变,store只要有变化,灰色的组件就会自动从store里重新取数据,这样绿色组件的数据就很方便的传到其它灰色组件里了。redux就是把公用的数据放在公共的区域去存..._redux redis

linux 解压zip大文件(解决乱码问题)_linux 7za解压中文乱码-程序员宅基地

文章浏览阅读2.2k次,点赞3次,收藏6次。unzip版本不支持4G以上的压缩包所以要使用p7zip:Linux一个高压缩率软件wget http://sourceforge.net/projects/p7zip/files/p7zip/9.20.1/p7zip_9.20.1_src_all.tar.bz2tar jxvf p7zip_9.20.1_src_all.tar.bz2cd p7zip_9.20.1make && make install 如果安装失败,看一下报错是不是因为没有下载gcc 和 gcc ++(p7_linux 7za解压中文乱码