大数据时代,数据实时同步解决方案的思考—最全的数据同步总结

  • 时间:
  • 浏览:46
  • 来源:大发快3_快3娱乐_大发快3娱乐

1、 早期关系型数据库之间的数据同步

1)、全量同步

比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法要是 分页查询源端的表,有刚刚通过 jdbc的batch 辦法 插入到目标表,你你这个地方不能 注意的是,分页查询时,一定要按照主键id来排序分页,补救重复插入。

2)、基于数据文件导出和导入的全量同步,你你这个同步辦法 一般只适用于同种数据库之间的同步,之后是不同的数据库,你你这个辦法 以全是处在问題。

3)、基于触发器的增量同步

增量同步一般是做实时的同步,早期什么都有数据同步全是基于关系型数据库的触发器trigger来做的。

使用触发器实时同步数据的步骤:

A、 基于原表创触发器,触发器涵盖insert,modify,delete 五种类型的操作,数据库的触发器分Before和After五种状况,五种是在insert,modify,delete 五种类型的操作处在之后触发(比如记录日志操作,一般是Before),五种是在insert,modify,delete 五种类型的操作之后触发。

B、 创建增量表,增量表中的字段和原表中的字段删剪一样,有刚刚不能 多另一个 操作类型字段(分表代表insert,modify,delete 五种类型的操作),有刚刚不能 另一个 唯一自增ID,代表数据原表中数据操作的顺序,你你这个自增id非常重要,不然数据同步就会错乱。

C、 原表中出現insert,modify,delete 五种类型的操作时,通过触发器自动产生增量数据,插入增量表中。

D、补救增量表中的数据,补救时,一定是按照自增id的顺序来补救,你你这个传输强度会非常低,没辦法 做批量操作,不然数据会错乱。  一群人以全是说,是全是都不能把insert操作合并在一块儿,modify合并在一块儿,delete操作合并在一块儿,有刚刚批量补救,我给的答案是不行,之后数据的增删剪是有顺序的,合并后,就那么 顺序了,同两根数据的增删剪顺序一旦错了,那数据同步就肯定错了。

市面上什么都有数据etl数据交换产品全是基于你你这个思想来做的。

E、 你你这个思想使用kettle 很容易就都不能实现,笔者之后在被委托人的博客中写过 kettle的文章,https://www.cnblogs.com/laoqing/p/73400673.html

4)、基于时间戳的增量同步

A、首先一点人不能 一张临时temp表,用来存取每次读取的待同步的数据,也要是把每次从原表中根据时间戳读取到数据先插入到临时表中,每次在插入前,先清空临时表的数据

B、一点人还不能 创建另一个 时间戳配置表,用于存放每次读取的补救完的数据的最后的时间戳。

C、每次从原表中读取数据时,先查询时间戳配置表,有刚刚就知道了查询原表时的开始时间戳。

D、根据时间戳读取到原表的数据,插入到临时表中,有刚刚再将临时表中的数据插入到目标表中。

E、从缓存表中读取出数据的最大时间戳,有刚刚更新到时间戳配置表中。缓存表的作用要是使用sql获取每次读取到的数据的最大的时间戳,当然哪几个全是删剪基于sql语句在kettle中来配置,才不能 之后的一张临时表。

2、    大数据时代下的数据同步

1)、基于数据库日志(比如mysql的binlog)的同步

一点人都知道什么都有数据库都支持了主从自动同步,尤其是mysql,都不能支持多主多从的模式。那么 一点人是全是都不能利用你你这个思想呢,答案当然是肯定的,mysql的主从同步的过程是之后的。

  A、master将改变记录到二进制日志(binary log)中(哪几个记录叫做二进制日志事件,binary log events,都不能通过show binlog events进行查看);

  B、slave将master的binary log events拷贝到它的中继日志(relay log);

  C、slave重做中继日志中的事件,将改变反映它被委托人的数据。

阿里巴巴开源的canal就完美的使用你你这个辦法 ,canal 伪装了另一个 Slave 去喝Master进行同步。

A、 canal模拟mysql slave的交互协议,伪装被委托人为mysql slave,向mysql master发送dump协议

B、 mysql master收到dump请求,开始推送binary log给slave(也要是canal)

C、 canal解析binary log对象(原始为byte流)

另外canal 在设计时,有点硬设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample

canal c# 客户端: https://github.com/dotnetcore/CanalSharp

canal go客户端: https://github.com/CanalClient/canal-go

canal php客户端: https://github.com/xingwenge/canal-php、

github的地址:https://github.com/alibaba/canal/

D、在使用canal时,mysql不能 开启binlog,有刚刚binlog-format不能 为row,都不能在mysql的my.cnf文件中增加如下配置

log-bin=E:/mysql5.5/bin_log/mysql-bin.log

binlog-format=ROW

server-id=123、

E、 部署canal的服务端,配置canal.properties文件,有刚刚 启动 bin/startup.sh 或bin/startup.bat

#设置要监听的mysql服务器的地址和端口

canal.instance.master.address = 127.0.0.1:34006

#设置另一个 可访问mysql的用户名和密码并具有相应的权限,本示例用户名、密码都为canal

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

#连接的数据库

canal.instance.defaultDatabaseName =test

#订阅实例中所有的数据库和表

canal.instance.filter.regex = .*\\..*

#连接canal的端口

canal.port= 11111

#监听到的数据变更发送的队列

canal.destinations= example

F、 客户端开发,在maven中引入canal的依赖

   <dependency>
         <groupId>com.alibaba.otter</groupId>
          <artifactId>canal.client</artifactId>
          <version>1.0.21</version>
      </dependency>

代码示例:

package com.example;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

 
public class CanalClientExample {

    public static void main(String[] args) {
        while (true) {
            //连接canal
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");
            connector.connect();
            //订阅 监控的 数据库.表
            connector.subscribe("demo_db.user_tab");
            //一次取10条
            Message msg = connector.getWithoutAck(10);

            long batchId = msg.getId();
            int size = msg.getEntries().size();
            if (batchId < 0 || size == 0) {
                System.out.println("那么

消息,休眠5秒");
                try {
                    Thread.sleep(40000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //
                CanalEntry.RowChange row = null;
                for (CanalEntry.Entry entry : msg.getEntries()) {
                    try {
                        row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDatasList = row.getRowDatasList();
                        for (CanalEntry.RowData rowdata : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList();
                            Map<String, Object> dataMap = transforListToMap(afterColumnsList);
                            if (row.getEventType() == CanalEntry.EventType.INSERT) {
                                //具体业务操作
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.UPDATE) {
                                //具体业务操作
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.DELETE) {
                                List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    if ("id".equals(column.getName())) {
                                        //具体业务操作
                                        System.out.println("删除的id:" + column.getValue());
                                    }
                                }
                            } else {
                                System.out.println("一点操作类型不做补救");
                            }

                        }

                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
                //确认消息
                connector.ack(batchId);
            }


        }
    }

    public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) {
        Map map = new HashMap();
        if (afterColumnsList != null && afterColumnsList.size() > 0) {
            for (CanalEntry.Column column : afterColumnsList) {
                map.put(column.getName(), column.getValue());
            }
        }
        return map;
    }


}

2)、基于BulkLoad的数据同步,比如从hive同步数据到hbase

 

一点人有五种辦法 都不能实现,

A、 使用spark任务,通过HQl读取数据,有刚刚再通过hbase的Api插入到hbase中。

有刚刚你你这个做法,传输强度很低,有刚刚大批量的数据一块儿插入Hbase,对Hbase的性能影响很大。

在大数据量的状况下,使用BulkLoad都不能快速导入,BulkLoad主要是借用了hbase的存储设计思想,之后hbase本质是存储在hdfs上的另一个 文件夹,有刚刚底层是以另一个 个的Hfile处在的。HFile的形式处在。Hfile的路径格式一般是之后的:

/hbase/data/default(默认是你你这个,之后hbase的表那么 指定命名空间语句,之后指定了,你你这个要是命名空间的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>

B、 BulkLoad实现的原理要是按照HFile格式存储数据到HDFS上,生成Hfile都不能使用hadoop的MapReduce来实现。之后全是hive中的数据,比如内部管理的数据,那么 一点人都不能将内部管理的数据生成文件,有刚刚上传到hdfs中,组装RowKey,有刚刚将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。

 

当然一点人也都不能不之后生成hfile,都不能使用spark任务直接从hive中读取数据转去掉 RDD,有刚刚使用HbaseContext的自动生成Hfile文件,部分关键代码如下:

…
//将DataFrame转换bulkload不能

的RDD格式
    val rddnew = datahiveDF.rdd.map(row => {
      val rowKey = row.getAs[String](rowKeyField)
 
      fields.map(field => {
        val fieldValue = row.getAs[String](field)
        (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
      })
    }).flatMap(array => {
      (array)
    })
…
//使用HBaseContext的bulkload生成HFile文件
    hbaseContext.bulkLoad[Put](rddnew.map(record => {
      val put = new Put(record._1)
      record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
      put
    }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
 
    val conn = ConnectionFactory.createConnection(hBaseConf)
    val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
    val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
    val realTable = conn.getTable(hbTableName)
    HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
 
    // bulk load start
    val loader = new LoadIncrementalHFiles(hBaseConf)
    val admin = conn.getAdmin()
    loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
 
    sc.stop()
  }
…
  def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
    val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
    import scala.collection.JavaConversions._
    for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
      val family = cells.getKey
      for (value <- cells.getValue) {
        val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
        ret.+=((kfq, CellUtil.cloneValue(value)))
      }
    }
    ret.iterator
  }
}

…

C、pg_bulkload的使用

这是另一个 支持pg库(PostgreSQL)批量导入的插件工具,它的思想也是通过内部管理文件加载的辦法 ,你你这个工具笔者那么 亲自去用过,删剪的介绍都不能参考:https://my.oschina.net/u/3317105/blog/852785   pg_bulkload项目的地址:http://pgfoundry.org/projects/pgbulkload/

3)、基于sqoop的全量导入

Sqoop 是hadoop生态中的另一个 工具,专门用于内部管理数据导入进入到hdfs中,内部管理数据导出时,支持什么都有常见的关系型数据库,也是在大数据中常用的另一个 数据导出导入的交换工具。

 

Sqoop从内部管理导入数据的流程图如下:

Sqoop将hdfs中的数据导出的流程如下:

本质全是用了大数据的数据分布式补救来快速的导入和导出数据。