赞
踩
Flink流数据常常存在写入数据库的场景,一般是通过继承RichSinkFunction来实现对数据的写入。如果sink之前不做优化处理,写入时都是单条写入。单条写入有许多弊端:
1、写入频繁造成数据库压力大
2、写入速度慢、效率低,造成反压
所以需要使用批量写入的方式,本文通过开窗window定时缓存周期数据形成批,下发给sink节点,本文通过大数据量生产环境验证,不仅实现了批量写入,还在防止数据倾斜支持并行等方面做了优化,乃呕心之作。
主函数
KeyedStream keyedStream=sinkStream.keyBy(new HashModKeySelector(keyIndexList,paralleSize));
winStream=keyedStream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(windowSize))) .process(new RowProcessWindowFunction(keyIndexList));
DataStreamSink sink=winStream.addSink(new DbSinkFunction(conf,writeSql));
1、对业务数据进行分组HashModKeySelector
public class HashModKeySelector implements KeySelector<Row, String> {
private static final Logger logger = LoggerFactory.getLogger(HashModKeySelector2.class);
private static final long serialVersionUID = 1L;
/**
* key在row中的索引
*/
private List<Integer> keyIndexList=null;
private Integer paralleSize;
private Map<String,String> md5Map = new ConcurrentHashMap<>();
public HashModKeySelector2(List<Integer> keyIndexList, Integer paralleSize) {
this.keyIndexList=keyIndexList;
this.paralleSize=paralleSize;
}
@Override
public String getKey(Row value) {
int size=keyIndexList.size();
Row keyRow=new Row(size);
for(int i=0;i<size;i++) {
int index=keyIndexList.get(i);
keyRow.setField(i, value.getField(index));
}
int keyHash=keyRow.hashCode()%paralleSize;
String strKey=String.valueOf(keyHash);
String md5Value = md5Map.get(strKey);
if(StringUtils.isBlank(md5Value)){
md5Value=md5(strKey);
md5Map.put(strKey,md5Value);
}
return md5Value;
}
public static String md5(String key) {
String result="";
try {
// 创建MD5消息摘要对象
MessageDigest md = MessageDigest.getInstance("MD5");
// 计算消息的摘要
byte[] digest = md.digest(key.getBytes());
// 将摘要转换为十六进制字符串
String hexString = bytesToHex(digest);
result=hexString;
} catch (Exception e) {
logger.error("计算{}md5值失败:",key,e);
return key;
}
return result;
}
public static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
}
}
2、 使用滚动窗口缓存数据,将单条数据放入集合中,发送到下游
public class RowProcessWindowFunction extends ProcessWindowFunction<Row, Row[], String, TimeWindow>{
private static final Logger LOG = LoggerFactory.getLogger(RowProcessWindowFunction.class);
/**
* key在row中的索引
*/
private List<Integer> keyIndexList;
public RowProcessWindowFunction(List<Integer> keyIndexList) {
if(keyIndexList==null||keyIndexList.size()==0) {
LOG.error("keyIndexList is empty");
throw new RuntimeException("keyIndexList is empty");
}
this.keyIndexList=keyIndexList;
}
@Override
public void process(String key, Context context, Iterable<Row> inRow, Collector<Row[]> out) throws Exception {
List<Row> rowList=new ArrayList<>();
for (Row row : inRow) {
rowList.add(row);
}
int size=rowList.size();
Row[] rows=new Row[size];
int index=0;
for(Row tmpRow:rowList) {
rows[index]=tmpRow;
index=index+1;
}
out.collect(rows);
}
}
3、批量写入
public class DbSinkFunction<I> extends RichSinkFunction<I> {
private static final Logger LOG = LoggerFactory.getLogger(DbSinkFunction.class);
private String driver = null;
private String sql = null;
DbConnectionPool pool = null;
private Integer laodRate;
private int columnTypes[];
public DbSinkFunction(String dbDriver, String dmlSql) {
this.driver = dbDriver;
this.sql = dmlSql;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//创建连接池
pool = new DbConnectionPool(conf, driver);
}
@Override
public void close() throws Exception {
//关闭资源、释放资源
super.close();
//关闭连接池
pool.close();
}
/**
* 写入数据库
*/
@Override
public void invoke(I record, Context context) throws Exception {
PreparedStatement ps = null;
Boolean isBatch = false;
String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));
int length=1;
Connection connection =null;
try {
connection =pool.getConnection();
ps = connection.prepareStatement(sql);
//如果是批量数据
if (record instanceof Row[]) {
isBatch = true;
connection.setAutoCommit(false);
Row[] rowArray = (Row[]) record;
length=rowArray.length;
LOG.info("Row array:{}",length);
int no=0;
for(int i=1;i<=length;i++) {
Row row=rowArray[i-1];
fillPreparedStatement(ps, row);
ps.addBatch();
if(i%3000==0) {
ps.executeBatch();
connection.commit();
ps.clearBatch();
no=0;
}
no=no+1;
}
if(no>0) {
ps.executeBatch();
connection.commit();
ps.clearBatch();
}
} else if (record instanceof Row) {
//单条数据
isBatch = false;
Row row = (Row) record;
fillPreparedStatement(ps, row);
ps.execute();
} else {
throw new RuntimeException("不支持的数据类型 " + record.getClass());
}
} catch (SQLException e) {
connection.rollback();
if (isBatch) {
doOneInsert(sql, connection, (Row[]) record);
}
} catch (Exception e) {
LOG.error("写入失败", e);
} finally {
closeDBResources(ps,connection);
}
}
/**
* 批量失败时 单条写入
*
* @param sql
* @param connection
* @param rowArray
*/
protected void doOneInsert(String sql, Connection connection, Row[] rowArray) {
PreparedStatement ps = null;
String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));
try {
Integer allSize = rowArray.length;
Integer errCount = 0;
connection.setAutoCommit(true);
ps = connection.prepareStatement(sql);
for (Row row : rowArray) {
try {
fillPreparedStatement(ps, row);
ps.execute();
} catch (SQLException e) {
errCount++;
} finally {
ps.clearParameters();
}
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
closeDBResources(ps,null);
}
}
private void closeDBResources(Statement stmt, Connection conn) {
try {
if (!(null== stmt||stmt.isClosed())) {
stmt.close();
}
if (!(null == conn||conn.isClosed())) {
conn.close();
}
} catch (SQLException e) {
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。