当前位置:   article > 正文

【Flink Sink 流数据批量写入数据库】_atomiclong实现flink sink到时间批量写入

atomiclong实现flink sink到时间批量写入

概要

Flink流数据常常存在写入数据库的场景,一般是通过继承RichSinkFunction来实现对数据的写入。如果sink之前不做优化处理,写入时都是单条写入。单条写入有许多弊端:
1、写入频繁造成数据库压力大
2、写入速度慢、效率低,造成反压
所以需要使用批量写入的方式,本文通过开窗window定时缓存周期数据形成批,下发给sink节点,本文通过大数据量生产环境验证,不仅实现了批量写入,还在防止数据倾斜支持并行等方面做了优化,乃呕心之作。

批量写入功能实现

主函数

KeyedStream keyedStream=sinkStream.keyBy(new  HashModKeySelector(keyIndexList,paralleSize));
winStream=keyedStream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(windowSize))) .process(new RowProcessWindowFunction(keyIndexList));
DataStreamSink  sink=winStream.addSink(new DbSinkFunction(conf,writeSql));
  • 1
  • 2
  • 3

1、对业务数据进行分组HashModKeySelector

public class HashModKeySelector implements KeySelector<Row, String> {
	private static final Logger logger = LoggerFactory.getLogger(HashModKeySelector2.class);
	private static final long serialVersionUID = 1L;
	/**
	 * key在row中的索引
	 */
	private List<Integer> keyIndexList=null;
	private Integer paralleSize;
	private Map<String,String> md5Map = new ConcurrentHashMap<>();
	public HashModKeySelector2(List<Integer> keyIndexList, Integer paralleSize) {
		this.keyIndexList=keyIndexList;
		this.paralleSize=paralleSize;
	}
	@Override
	public String getKey(Row value) {
		int size=keyIndexList.size();
		Row keyRow=new Row(size);		
		for(int i=0;i<size;i++) {
			int index=keyIndexList.get(i);
			keyRow.setField(i, value.getField(index));
		}
		int keyHash=keyRow.hashCode()%paralleSize;
		String strKey=String.valueOf(keyHash);
		String md5Value = md5Map.get(strKey);
		if(StringUtils.isBlank(md5Value)){
			md5Value=md5(strKey);
			md5Map.put(strKey,md5Value);
		}
		return md5Value;
	}
	public static String md5(String key) {
		String result="";
		try {
			// 创建MD5消息摘要对象
			MessageDigest md = MessageDigest.getInstance("MD5");
			// 计算消息的摘要
			byte[] digest = md.digest(key.getBytes());
			// 将摘要转换为十六进制字符串
			String hexString = bytesToHex(digest);
			result=hexString;
		} catch (Exception e) {
			logger.error("计算{}md5值失败:",key,e);
			return key;
		}
		return result;
	}
	public static String bytesToHex(byte[] bytes) {
		StringBuilder hexString = new StringBuilder();
		for (byte b : bytes) {
			String hex = Integer.toHexString(0xff & b);
			if (hex.length() == 1) {
				hexString.append('0');
			}
			hexString.append(hex);
		}
		return hexString.toString();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

2、 使用滚动窗口缓存数据,将单条数据放入集合中,发送到下游

public class RowProcessWindowFunction extends ProcessWindowFunction<Row, Row[], String, TimeWindow>{
	private static final Logger LOG = LoggerFactory.getLogger(RowProcessWindowFunction.class);
	/**
	 * key在row中的索引
	 */
	private List<Integer> keyIndexList;
	public RowProcessWindowFunction(List<Integer> keyIndexList) {
		if(keyIndexList==null||keyIndexList.size()==0) {
			LOG.error("keyIndexList is empty");
			throw new RuntimeException("keyIndexList is empty");
		}
		this.keyIndexList=keyIndexList;
	}
	@Override
	public void process(String key, Context context, Iterable<Row> inRow, Collector<Row[]> out) throws Exception {
		List<Row> rowList=new ArrayList<>();
		for (Row row : inRow) {
			rowList.add(row);
		}
		int size=rowList.size();
		Row[] rows=new Row[size];
		int index=0;
		for(Row tmpRow:rowList) {
			rows[index]=tmpRow;
			index=index+1;
		}
		out.collect(rows);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

3、批量写入

public class DbSinkFunction<I> extends RichSinkFunction<I> {
    private static final Logger LOG = LoggerFactory.getLogger(DbSinkFunction.class);
    private String driver = null;
    private String sql = null;
    DbConnectionPool pool = null;
    private Integer laodRate;
    private int columnTypes[];

    public DbSinkFunction(String dbDriver, String dmlSql) {
        this.driver = dbDriver;
        this.sql = dmlSql;
    }
   

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //创建连接池
        pool = new DbConnectionPool(conf, driver); 
    }

    @Override
    public void close() throws Exception {
        //关闭资源、释放资源
        super.close();
        //关闭连接池
        pool.close();
    }

    /**
     * 写入数据库
     */
    @Override
    public void invoke(I record, Context context) throws Exception {

        PreparedStatement ps = null;
        Boolean isBatch = false;        
        String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));
        int length=1;
        Connection connection =null;
        try {
        	connection =pool.getConnection();
            ps = connection.prepareStatement(sql);
            //如果是批量数据
            if (record instanceof Row[]) {
                isBatch = true;
                connection.setAutoCommit(false);
                Row[] rowArray = (Row[]) record;
                length=rowArray.length;
                LOG.info("Row array:{}",length);
                int no=0;
                for(int i=1;i<=length;i++) {
                	Row row=rowArray[i-1];
                	fillPreparedStatement(ps, row);
                	ps.addBatch();
                	if(i%3000==0) {
                		ps.executeBatch();
                		connection.commit();
                		ps.clearBatch();
                		no=0;
                	}
                	no=no+1;
                }
                if(no>0) {
                	ps.executeBatch();
            		connection.commit();
            		ps.clearBatch();
              	}
            } else if (record instanceof Row) {
                //单条数据
                isBatch = false;
                Row row = (Row) record;
                fillPreparedStatement(ps, row);
                ps.execute();
            } else {
                throw new RuntimeException("不支持的数据类型 " + record.getClass());
            }
        } catch (SQLException e) {
            connection.rollback();
            if (isBatch) {
                doOneInsert(sql, connection, (Row[]) record);
            }
        } catch (Exception e) {
            LOG.error("写入失败", e);
        } finally {
        	closeDBResources(ps,connection);
        }

    }
    /**
     * 批量失败时 单条写入
     *
     * @param sql
     * @param connection
     * @param rowArray
     */
    protected void doOneInsert(String sql, Connection connection, Row[] rowArray) {
        PreparedStatement ps = null;
        String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));
        try {
            Integer allSize = rowArray.length;
            Integer errCount = 0;
            connection.setAutoCommit(true);
            ps = connection.prepareStatement(sql);
            for (Row row : rowArray) {
                try {
                    fillPreparedStatement(ps, row);
                    ps.execute();
                } catch (SQLException e) {
                    errCount++;
                } finally {
                    ps.clearParameters();
                }
            }

        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        } finally {

        	closeDBResources(ps,null);
        }
    }
    private void closeDBResources(Statement stmt, Connection conn) {
    	try {
			if (!(null== stmt||stmt.isClosed())) {
			     stmt.close();
			}
	        if (!(null == conn||conn.isClosed())) {
	            conn.close();
	        }
		} catch (SQLException e) {
		}
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
  

闽ICP备14008679号