赞
踩
Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。
myDstream.addSink(new MySink(xxxx)) |
官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。
pom.xml
<!– https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 –> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.7.0</version> </dependency> |
mykafkaUtil中增加方法
def getProducer(topic:String): FlinkKafkaProducer011[String] ={ new FlinkKafkaProducer011[String](brokerList,topic,new SimpleStringSchema()) } |
主函数中添加sink
val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer(“channel_sum”) sumDstream.map( chCount=>chCount._1+“:”+chCount._2 ).addSink(myKafkaProducer) |
pom.xml
<!– https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis –> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> |
object MyRedisUtil {
val conf = new FlinkJedisPoolConfig.Builder().setHost(“hadoop1”).setPort(6379).build() def getRedisSink(): RedisSink[(String,String)] ={ new RedisSink[(String,String)](conf,new MyRedisMapper) } class MyRedisMapper extends RedisMapper[(String,String)]{ override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, “channel_count”) // new RedisCommandDescription(RedisCommand.SET ) } override def getValueFromData(t: (String, String)): String = t._2 override def getKeyFromData(t: (String, String)): String = t._1 } } |
在主函数中调用
sumDstream.map( chCount=>(chCount._1,chCount._2+“” )).addSink(MyRedisUtil.getRedisSink()) |
pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.3</version> </dependency> |
添加MyEsUtil
import java.util import com.alibaba.fastjson.{JSON, JSONObject} import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests object MyEsUtil {
val httpHosts = new util.ArrayList[HttpHost] httpHosts.add(new HttpHost(“hadoop1”,9200,“http”)) httpHosts.add(new HttpHost(“hadoop2”,9200,“http”)) httpHosts.add(new HttpHost(“hadoop3”,9200,“http”)) def getElasticSearchSink(indexName:String): ElasticsearchSink[String] ={ val esFunc = new ElasticsearchSinkFunction[String] { override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = { println(“试图保存:”+element) val jsonObj: JSONObject = JSON.parseObject(element) val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`(“_doc”).source(jsonObj) indexer.add(indexRequest) println(“保存1条”) } } val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc) //刷新前缓冲的最大动作量 sinkBuilder.setBulkFlushMaxActions(10)
sinkBuilder.build() } } |
在main方法中调用
// 明细发送到es 中 val esSink: ElasticsearchSink[String] = MyEsUtil.getElasticSearchSink(“gmall0503_startup”) dstream.addSink(esSink) |
<!– https://mvnrepository.com/artifact/mysql/mysql-connector-java –> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.10</version> </dependency> |
添加MyJdbcSink
class MyJdbcSink(sql:String ) extends RichSinkFunction[Array[Any]] { val driver=“com.mysql.jdbc.Driver” val url=“jdbc:mysql://hadoop2:3306/gmall1111?useSSL=false” val username=“root” val password=“123123” val maxActive=“20” var connection:Connection=null; //创建连接 override def open(parameters: Configuration): Unit = { val properties = new Properties() properties.put(“driverClassName”,driver) properties.put(“url”,url) properties.put(“username”,username) properties.put(“password”,password) properties.put(“maxActive”,maxActive) val dataSource: DataSource = DruidDataSourceFactory.createDataSource(properties) connection = dataSource.getConnection() } //反复调用 override def invoke(values: Array[Any]): Unit = { val ps: PreparedStatement = connection.prepareStatement(sql ) println(values.mkString(“,”)) for (i <- 0 until values.length) { ps.setObject(i + 1, values(i)) } ps.executeUpdate() } override def close(): Unit = { if(connection!=null){ connection.close() } } } |
在main方法中增加
把明细保存到mysql中
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])} val jdbcSink = new MyJdbcSink(“insert into z_startup values(?,?,?,?,?)”) startUplogDstream.map(startuplog=>Array(startuplog.mid,startuplog.uid,startuplog.ch,startuplog.area, startuplog.ts)).addSink(jdbcSink) |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。