Fork me on GitHub

datax执行流程

datax执行流程
1. 首先开启一个job对源数据连接进行检查,判断是否是通的,检查源表有哪些字段,开启job容器,
readerJob,writerJob,job容器分割为小的task,job设置通道大小N,查询分割主键的最小值和最大值,分割后的所有的查询sql,job容器开启调度
2. 调度开启一个taskGroup,task容器为所有的task开启N个通道,异步的执行各个task
3.所有的任务完成后开始post操作,WriterJobReaderJob都进行post操作

datax监控读写记录的方式:
采用一个消息件Communication
job容器和taskgroup容器之间通过communication传递读写任务执行的情况
jobId绑定一个communication taskgroupId绑定一个communication
定时的taskgroup容器会将自己采集到的数据汇报给job容器
core.json 汇报时间设置为1s 原本是10s
job容器等候1s获得汇报者采集的数据
job容器初始化消息件,消息件初始化会设置采集器和汇报者
job容器的消息件是将所有的taskgroup的消息件合并在一起
job容器和taskgroup容器继承同一个抽象容器使用它的containerCommunicator变量进行汇报
taskgroupRunner开启一个任务执行taskgroup容器start方法,在taskgroup容器中根据配置文件注册task消息件,注册的同时将taskId和消息件绑定到taskCommunicationMap
taskcommunicationtask消息件用于在WriterReader中统计读写记录情况,taskgroup容器初始化每个任务的taskcommunication消息件
taskExecutor代表任务执行器,它根据task配置初始化task taskcommunication channel,为绑定的channel设置相应的taskcommunication缓冲区com.alibaba.datax.core.plugin.BufferedRecordExchanger进行数据交换的大小为32条记录

Channel中读取数据record时最后会读取到一个结束标识record所以在统计成功的写数据到es时应该减去1 datax框架依赖jar包A,datax的插件依赖jar包A会导致:如果框架已经加载了jar包A,插件再去加载会导致当前加载器不是使用插件的加载器而是使用父类的加载器.可以在插件加载某个jar包时指定加载器


最新评论

    还没有人评论...

当当

友情链接

Powered by Python. Copyright © 2017.

鄂ICP备17010875号. All rights reserved.