当前位置:   article > 正文

大数据培训Flink 流处理Api之Sink_internal.genericjdbcsinkfunction and org.apache.fl

internal.genericjdbcsinkfunction and org.apache.flink.api.connector.sink.sin

Sink

Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。

myDstream.addSink(new MySink(xxxx))

官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

1 Kafka

pom.xml

<!– https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 –>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.11_2.11</artifactId>

<version>1.7.0</version>

</dependency>

mykafkaUtil中增加方法

def getProducer(topic:String): FlinkKafkaProducer011[String] ={

new FlinkKafkaProducer011[String](brokerList,topic,new SimpleStringSchema())

}

主函数中添加sink

val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer(“channel_sum”) sumDstream.map( chCount=>chCount._1+“:”+chCount._2 ).addSink(myKafkaProducer)

2 Redis

pom.xml

<!– https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis –>

<dependency>

<groupId>org.apache.bahir</groupId>

<artifactId>flink-connector-redis_2.11</artifactId>

<version>1.0</version>

</dependency>

object MyRedisUtil {

val conf = new FlinkJedisPoolConfig.Builder().setHost(“hadoop1”).setPort(6379).build()

def getRedisSink(): RedisSink[(String,String)] ={

new RedisSink[(String,String)](conf,new MyRedisMapper)

}

class MyRedisMapper extends RedisMapper[(String,String)]{

override def getCommandDescription: RedisCommandDescription = {

new RedisCommandDescription(RedisCommand.HSET, “channel_count”)

// new RedisCommandDescription(RedisCommand.SET )

}

override def getValueFromData(t: (String, String)): String = t._2

override def getKeyFromData(t: (String, String)): String = t._1

}

}

在主函数中调用

sumDstream.map( chCount=>(chCount._1,chCount._2+“” )).addSink(MyRedisUtil.getRedisSink())

3 Elasticsearch

pom.xml

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-elasticsearch6_2.11</artifactId>

<version>1.7.0</version>

</dependency>

<dependency>

<groupId>org.apache.httpcomponents</groupId>

<artifactId>httpclient</artifactId>

<version>4.5.3</version>

</dependency>

添加MyEsUtil

import java.util

import com.alibaba.fastjson.{JSON, JSONObject}

import org.apache.flink.api.common.functions.RuntimeContext

import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}

import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink

import org.apache.http.HttpHost

import org.elasticsearch.action.index.IndexRequest

import org.elasticsearch.client.Requests

object MyEsUtil {

val httpHosts = new util.ArrayList[HttpHost]

httpHosts.add(new HttpHost(“hadoop1”,9200,“http”))

httpHosts.add(new HttpHost(“hadoop2”,9200,“http”))

httpHosts.add(new HttpHost(“hadoop3”,9200,“http”))

def getElasticSearchSink(indexName:String): ElasticsearchSink[String] ={

val esFunc = new ElasticsearchSinkFunction[String] {

override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {

println(“试图保存:”+element)

val jsonObj: JSONObject = JSON.parseObject(element)

val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`(“_doc”).source(jsonObj)

indexer.add(indexRequest)

println(“保存1条”)

}

}

val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)

//刷新前缓冲的最大动作量

sinkBuilder.setBulkFlushMaxActions(10)

sinkBuilder.build()

}

}

在main方法中调用

// 明细发送到es 中

val esSink: ElasticsearchSink[String] = MyEsUtil.getElasticSearchSink(“gmall0503_startup”)

dstream.addSink(esSink)

4 JDBC 自定义sink

<!– https://mvnrepository.com/artifact/mysql/mysql-connector-java –>

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.44</version>

</dependency>

<dependency>

<groupId>com.alibaba</groupId>

<artifactId>druid</artifactId>

<version>1.1.10</version>

</dependency>

添加MyJdbcSink

class MyJdbcSink(sql:String ) extends RichSinkFunction[Array[Any]] {

val driver=“com.mysql.jdbc.Driver”

val url=“jdbc:mysql://hadoop2:3306/gmall1111?useSSL=false”

val username=“root”

val password=“123123”

val maxActive=“20”

var connection:Connection=null;

//创建连接

override def open(parameters: Configuration): Unit = {

val properties = new Properties()

properties.put(“driverClassName”,driver)

properties.put(“url”,url)

properties.put(“username”,username)

properties.put(“password”,password)

properties.put(“maxActive”,maxActive)

val dataSource: DataSource = DruidDataSourceFactory.createDataSource(properties)

connection = dataSource.getConnection()

}

//反复调用

override def invoke(values: Array[Any]): Unit = {

val ps: PreparedStatement = connection.prepareStatement(sql )

println(values.mkString(“,”))

for (i <- 0 until values.length) {

ps.setObject(i + 1, values(i))

}

ps.executeUpdate()

}

override def close(): Unit = {

if(connection!=null){

connection.close()

}

}

}

在main方法中增加

把明细保存到mysql中

val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}

val jdbcSink = new MyJdbcSink(“insert into z_startup values(?,?,?,?,?)”)

startUplogDstream.map(startuplog=>Array(startuplog.mid,startuplog.uid,startuplog.ch,startuplog.area, startuplog.ts)).addSink(jdbcSink)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/615160
推荐阅读
相关标签
  

闽ICP备14008679号