当前位置:   article > 正文

Flink的sink实战之四:自定义,灵魂拷问_flink 自定义sink

flink 自定义sink

}

public void setAge(int age) {

this.age = age;

}

public Student(String name, int age) {

this.name = name;

this.age = age;

}

}

  1. 创建自定义sink类MySQLSinkFunction.java,这是本文的核心,有关数据库的连接、断开、写入数据都集中在此:

package com.bolingcavalry.customize;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.ReentrantLock;

public class MySQLSinkFunction extends RichSinkFunction {

PreparedStatement preparedStatement;

private Connection connection;

private ReentrantLock reentrantLock = new ReentrantLock();

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

//准备数据库相关实例

buildPreparedStatement();

}

@Override

public void close() throws Exception {

super.close();

try{

if(null!=preparedStatement) {

p

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/647466
推荐阅读
相关标签
  

闽ICP备14008679号