当前位置:   article > 正文

Flink 自定义Sink端_flink addsink

flink addsink

Sink介绍

在Fink官网中sink端只是给出了常规的write api.在我们实际开发场景中需要将flink处理的数据写入kafka,hbase kudu等外部系统。

UML关系

自定义Sink需要实现父类的接口和继承抽象类。

在这里插入图片描述上面是Sink的继承关系

Flink addSink

// 方法需要SinkFunction的对象
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();

		// configure the type if needed
		if (sinkFunction instanceof InputTypeConfigurable) {
			((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
		}

		StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

		DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

		getExecutionEnvironment().addOperator(sink.getTransformation());
		return sink;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

SinkFunction

// SinkFunction是一个接口
public interface SinkFunction<IN> extends Function, Serializable {
   //公共方法
	default void invoke(IN value, Context context) throws Exception {
		invoke(value);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

RichSinkFunction

@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {

	private static final long serialVersionUID = 1L;
}
  • 1
  • 2
  • 3
  • 4
  • 5

其他继承接口SinkFunction的类:

案例

自定义HbaseSink

public class HbaseSink extends RichSinkFunction<Tuple2<Integer, String>> {
    Logger logger = LoggerFactory.getLogger(HbaseSink.class);
    org.apache.hadoop.conf.Configuration configuration;
    Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //获取hbase 的链接信息
        configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");
        //创建conn
        connection = ConnectionFactory.createConnection(configuration);
        logger.info("创建链接成功");
    }

    @Override
    public void invoke(Tuple2<Integer, String> value, Context context) throws Exception {
        //往habse 里面插入数据
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Table table = connection.getTable(TableName.valueOf("torder_count"));
        Put put = new Put(value.f1.getBytes(StandardCharsets.UTF_8));
        put.addColumn("info".getBytes(), // 列族
                "order_total".getBytes(StandardCharsets.UTF_8), //特征字段
                value.f0.toString().getBytes()); //属性值
        put.addColumn("info".getBytes(), "insert_time".getBytes(), format.format(new Date(System.currentTimeMillis())).getBytes());
        table.put(put);
        table.close();
        logger.info("=====一条数据写入成功======,时间:"+value.f1+", 值:"+value.f0);
    }
    @Override
    public void close() throws Exception {
        super.close();
        connection.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

通过以上案例我们熟悉了addSink函数的操作。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/615103
推荐阅读
相关标签
  

闽ICP备14008679号