赞
踩
myDstream.addSink(new MySink(xxxx))
官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。
下面是几个Sink的Demo。当然要提前建立Maven工程,并导入pom依赖,我放到本文最下面了。
确定好版本,我的scala版本是2.11.8 flink版本是1.7.0
下面的程序运行之前,记得开启kafka集群。(运行每个类型的sink时,记得开启相应的程序,如redis集群,es集群)
因为没有实际数据,每次测试手动往kafka的accesslogs的topic打入数据,然后flink从kafka的该topic读出来,
再写入对应的sink
这两条数据是,每次就重复输入这两条命令。。。哈哈
{"area":"shandong","uid":"68","os":"android","ch":"360","appid":"gmall8888","mid":"mid_437","type":"stratup","vs":"1.2.0","ts":18561597017666}
{"area":"shandong","uid":"68","os":"apple","ch":"appstore","appid":"gmall8888","mid":"mid_437","type":"stratup","vs":"1.2.0","ts":18561597017666}
kafka相应命令 启动生产者 消费者
[root@hadoop01 ~]# kafka-console-producer.sh --topic accesslogs --broker-list hadoop01:9092
[root@hadoop01 ~]# kafka-console-consumer.sh --topic accesslogs --bootstrap-server hadoop01:9092
import com.alibaba.fastjson.JSON
import com.flink.sourceAndSink.bean.Startup
import com.flink.util.{MyEsUtil, MyKafkaUtil}
import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
package com.flink.sourceAndSink import com.alibaba.fastjson.JSON import com.flink.sourceAndSink.bean.Startup import com.flink.util.{MyEsUtil, MyKafkaUtil} import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer} /** * Created by Shi shuai RollerQing on 2019/12/4 19:42 * * flink连接kafka读取topic内数据 */ object StreamApiApp2 { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val kafkaSource: FlinkKafkaConsumer[String] = MyKafkaUtil.getKafkaSource("accesslogs") val dStream = env.addSource(kafkaSource) val startupLogDStream: DataStream[Startup] = dStream.filter(_.nonEmpty).map(jsonStr => JSON.parseObject(jsonStr, classOf[Startup])) /*** * 根据标签进行切分 split盖戳 select选择 **/ val splitStream: SplitStream[Startup] = startupLogDStream.split(startupLog => { var flag: List[String] = null if (startupLog.ch == "appstore") { flag = List("apple", "usa") } else if (startupLog.ch == "huawei") { flag = List("android", "china") } else { flag = List("android", "other") } flag }) val appleStream = splitStream.select("apple", "china") val otherStream: DataStream[Startup] = splitStream.select("other") appleStream.print("apple") otherStream.print("other") -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------- /*** * Kafka Sink 输出到指定topic **/ val kafkaSink: FlinkKafkaProducer[String] = MyKafkaUtil.getKafkaSink("topic-apple") appleStream.map(startLog => startLog.ch).addSink(kafkaSink)
三个sink测试始终复用了一个程序 StreamApiApp2
比如测试redis时,注释掉前边写好的几行kafka的代码即可,比较懒、、、、
如下:
......
......
......
/***
* Kafka Sink 输出到指定topic
**/
val kafkaSink: FlinkKafkaProducer[String] = MyKafkaUtil.getKafkaSink("topic-apple")
appleStream.map(startLog => startLog.ch).addSink(kafkaSink)
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer} object MyKafkaUtil { val prop = new Properties() prop.setProperty("bootstrap.servers", "hadoop01:9092") prop.setProperty("group.id", "flinkConnect") //这是source def getKafkaSource(topic : String ): FlinkKafkaConsumer[String] ={ val kafkaConsumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop) kafkaConsumer } //这是sink def getKafkaSink(topic : String): FlinkKafkaProducer[String] = { val kafkaProducer: FlinkKafkaProducer[String] = new FlinkKafkaProducer[String]("hadoop01:9092", topic, new SimpleStringSchema()) kafkaProducer } }
我连接的是redis集群,三台机器,六个服务
/*** * 将k v 类型的结果写入 redis * * 下面是求的各个渠道的累计个数 keyBy reduce * * 然后结果写入redis * * hset 即 hash set * * key : channel_sum * field : channel * value : count * **/ ...... ...... ...... val chKeyedStream: KeyedStream[(String, Int), Tuple] = startupLogDStream.map(startLog => (startLog.ch, 1)).keyBy(0) val chSumDStream: DataStream[(String, Int)] = chKeyedStream.reduce{ (ch1, ch2) => (ch1._1, ch2._2 + ch2._2)} chSumDStream.print("shishuai:").setParallelism(1) val redisSink: RedisSink[(String, Int)] = RedisUtil.getRedisSink() chSumDStream.addSink(redisSink) //数据还是这个 //{"area":"shandong","uid":"68","os":"android","ch":"360","appid":"gmall8888","mid":"mid_437","type":"stratup","vs":"1.2.0","ts":18561597017666} //{"area":"shandong","uid":"68","os":"apple","ch":"appstore","appid":"gmall8888","mid":"mid_437","type":"stratup","vs":"1.2.0","ts":18561597017666} //启动kafka集群 然后启动生产者 kafka-console-producer.sh --topic accesslogs --broker-list hadoop01:9092 //启动redis集群 一共三台服务器,六个服务节点,需要手动启动六次命令 [root@hadoop01 redis02]# ./src/redis-server ./redis.conf //运行后连接客户端查看 [root@hadoop01 redis02]# ./src/redis-cli -h hadoop01 -p 7002 -c // keys * //hadoop01:7002> HGETALL channel_sum
import java.net.InetSocketAddress import java.util import java.util._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisClusterConfig, FlinkJedisPoolConfig} import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} import redis.clients.jedis.HostAndPort /** * Created by Shi shuai RollerQing on 2019/12/4 21:53 */ object RedisUtil { /** * 单点连接redis服务器 */ // val conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop01").setPort(6379).build() /** * 连接redis集群 */ var jedisClusterNodes : Set[InetSocketAddress] = new HashSet[InetSocketAddress]() jedisClusterNodes.add(new InetSocketAddress("192.168.37.111", 7001)) jedisClusterNodes.add(new InetSocketAddress("192.168.37.111", 7002)) jedisClusterNodes.add(new InetSocketAddress("192.168.37.112", 7003)) jedisClusterNodes.add(new InetSocketAddress("192.168.37.112", 7004)) jedisClusterNodes.add(new InetSocketAddress("192.168.37.113", 7005)) jedisClusterNodes.add(new InetSocketAddress("192.168.37.113", 7006)) val conf = new FlinkJedisClusterConfig.Builder().setNodes(jedisClusterNodes).build() def getRedisSink(): RedisSink[(String, Int)] = { //更kafka一样 这个sink也需要两个参数 //说白了就是第一个是 写到哪,第二个是 怎么写 new RedisSink[(String, Int)](conf, new MyRedisMapper) } class MyRedisMapper extends RedisMapper[(String, Int)]{ override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, "channel_sum") } override def getKeyFromData(data: (String, Int)): String = data._1 override def getValueFromData(data: (String, Int)): String = data._2.toString } }
连接客户端查看情况,因为好久没使用redis数据都没了,运行后出现key:chennel_sum 这个是我我们在程序中写死的,
因为使用的是hash set 有三个key field value,而重写的RedisMapper里面的方法只能写两个值,所以我们只好把key写死
查看所有数据hgetAlll … 结果没问题
1.这里出了个错误 告诉我 es的索引必须小写
2.阿里的fastjson无法转换 case class 为json https://blog.csdn.net/C_time/article/details/103400765
......
......
......
/***
* 将startup对象数据写入es
*
* 没得问题 还是上面两条数据 打入kafka 启动程序前记得启动es集群
* 然后使用head插件查看结果 咩问题
**/
//Caused by: [getFlinkObject] ElasticsearchException[Elasticsearch
// exception [type=invalid_index_name_exception, reason=Invalid index name [getFlinkObject], must be lowercase]]
//index必须小写???
val esSink: ElasticsearchSink[Startup] = MyEsUtil.getEsSink("getflinkobject")
appleStream.addSink(esSink)
package com.flink.util import java.util import com.flink.sourceAndSink.bean.Startup import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests /** * Created by Shi shuai RollerQing on 2019/12/5 9:44 */ // 需要添加隐式转换 object MyEsUtil { val httpHosts = new util.ArrayList[HttpHost] httpHosts.add(new HttpHost("hadoop01", 9200, "http")) httpHosts.add(new HttpHost("hadoop02", 9200, "http")) httpHosts.add(new HttpHost("hadoop03", 9200, "http")) def getEsSink(indexName : String): ElasticsearchSink[Startup] ={ val esSinkBuilder: ElasticsearchSink.Builder[Startup] = new ElasticsearchSink.Builder[Startup](httpHosts, new MyElasticSearchFunction(indexName)) esSinkBuilder.setBulkFlushMaxActions(2)//批次插入 x条数据插入es一次 val esSink: ElasticsearchSink[Startup] = esSinkBuilder.build() esSink } class MyElasticSearchFunction(indexName : String) extends ElasticsearchSinkFunction[Startup]{ override def process(t: Startup, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { println("试图保存:"+ t) import org.json4s.native.Serialization implicit val formats = org.json4s.DefaultFormats val jsonStr: String = Serialization.write(t) val json = new util.HashMap[String, String]() json.put("data", jsonStr) val indexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(json) requestIndexer.add(indexRequest) println("保存1条") } } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>sparkCore</artifactId> <groupId>org.good.programmer</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>flink</artifactId> <dependencies> <!--flink 有界数据处理依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.7.0</version> </dependency> <!--flink 无界数据处理依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.7.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.3</version> </dependency> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-native_2.11</artifactId> <version>3.5.4</version> </dependency> </dependencies> <build> <plugins> <!-- 该插件用于将Scala代码编译成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 声明绑定到maven的compile阶段 --> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。