赞
踩
第一步:创建maven工程,导入jar包
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency> </dependencies> <build> <plugins> <!-- 限制jdk版本插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <!-- 编译scala需要用到的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 项目打包用到的插件 --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
第二步:开发flink代码统计socket当中的单词数量
开发flink代码实现接受socket单词数据,然后对数据进行统计
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time case class CountWord(word:String,count:Long) object FlinkCount { def main(args: Array[String]): Unit = { //获取程序入口类 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //从socket当中获取数据 val result: DataStream[String] = environment.socketTextStream("node01",9000) //导入隐式转换的包,否则时间不能使用 import org.apache.flink.api.scala._ //将数据进行切割,封装到样例类当中,然后进行统计 val resultValue: DataStream[CountWord] = result .flatMap(x => x.split(" ")) .map(x => CountWord(x,1)) .keyBy("word") // .timeWindow(Time.seconds(1),Time.milliseconds(1)) 按照每秒钟时间窗口,以及每秒钟滑动间隔来进行数据统计 .sum("count") //打印最终输出结果 resultValue.print().setParallelism(1) //启动服务 environment.execute() } }
第三步:打包上传到服务器运行
将我们的程序打包,然后上传到服务器进行运行,将我们打包好的程序上传到node01服务器,然后体验在各种模式下进行运行我们的程序
1、standAlone模式运行程序
第一步:启动flink集群
node01执行以下命令启动flink集群
cd /kkb/install/flink-1.8.1
bin/start-cluster.sh
第二步:启动node01的socket服务,并提交flink任务
node01执行以下命令启动node01的socket服务
nc -lk 9000
提交任务
将我们打包好的jar包上传到node01服务器的/kkb路径下,然后提交任务,注意,在pom.xml当中需要添加我们的打包插件,然后将任务代码进行打包,且集群已有的代码需要将打包scope设置为provided,在pom.xml将我们关于flink的jar包scope设置为provided打包,并将我们的jar-with-dependencies的jar包上传到node01服务器的/kkb路径下
node01执行以下命令提交任务
cd /kkb/install/flink-1.8.1/
bin/flink run --class com.kkb.flink.demo1.FlinkCount /kkb/flink_day01-1.0-SNAPSHOT-jar-with-dependencies.jar
第三步:查询运行结果
node01查看运行结果
cd /kkb/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-1-node01.kaikeba.com.out
注意:结果保存在以.out结尾的文件当中,哪个文件当中有数据,就查看哪个文件即可
pom依赖如下:
<properties>
<flink.version>1.9.0</flink.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
代码开发(java)
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCountJava { public static void main(String[] args) throws Exception { //flink提供的工具类,获取传递的参数 ParameterTool parameterTool = ParameterTool.fromArgs(args); String hostname = parameterTool.get("hostname"); int port = parameterTool.getInt("port"); //步骤一:获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //步骤二:获取数据源 DataStream<String> dataStream = env.socketTextStream(hostname, port); //步骤三:执行逻辑操作 DataStream<WordCount> wordAndOneStream = dataStream.flatMap(new FlatMapFunction<String, WordCount>() { public void flatMap(String line, Collector<WordCount> out) { String[] fields = line.split(","); for (String word : fields) { out.collect(new WordCount(word, 1L)); } } }); DataStream<WordCount> resultStream = wordAndOneStream.keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))//每隔1秒计算最近2秒 .sum("count"); //步骤四:结果打印 resultStream.print(); //步骤五:任务启动 env.execute("WindowWordCountJava"); } public static class WordCount{ public String word; public long count; //记得要有这个空构建 public WordCount(){ } public WordCount(String word,long count){ this.word = word; this.count = count; } @Override public String toString() { return "WordCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
代码开发(scala)
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time /** * 滑动窗口 * 每隔1秒钟统计最近2秒内的数据,打印到控制台。 */ object WindowWordCountScala { def main(args: Array[String]): Unit = { //获取参数 val hostname = ParameterTool.fromArgs(args).get("hostname") val port = ParameterTool.fromArgs(args).getInt("port") //TODO 导入隐式转换 import org.apache.flink.api.scala._ //步骤一:获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //步骤二:获取数据源 val textStream = env.socketTextStream(hostname,port) //步骤三:数据处理 val wordCountStream = textStream.flatMap(line => line.split(",")) .map((_, 1)) .keyBy(0) .timeWindow(Time.seconds(2), Time.seconds(1)) .sum(1) //步骤四:数据结果处理 wordCountStream.print() //步骤六:启动程序 env.execute("WindowWordCountScala") } }
flink也可以通过批量处理代码来实现批量数据处理
需求:处理附件中的count.txt文件,实现单词计数统计
import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment} object BatchOperate { def main(args: Array[String]): Unit = { val inputPath = "D:\\count.txt" val outPut = "D:\\data\\result2" //获取程序入口类ExecutionEnvironment val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile(inputPath) //引入隐式转换 import org.apache.flink.api.scala._ val value: AggregateDataSet[(String, Int)] = text.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1) value.writeAsText("d:\\datas\\result.txt").setParallelism(1) env.execute("batch word count") } }
离线代码开发(java)
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args)throws Exception { //步骤一:获取离线的程序入口 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String inputPath="D:\\kkb\\flinklesson\\src\\main\\input\\hello.txt"; //步骤二:获取数据源 DataSource<String> dataSet = env.readTextFile(inputPath); //步骤三:数据处理 FlatMapOperator<String, Tuple2<String, Integer>> wordAndOneDataSet = dataSet.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] fileds = line.split(","); for (String word : fileds) { collector.collect(new Tuple2<String, Integer>(word, 1)); } } }); AggregateOperator<Tuple2<String, Integer>> result = wordAndOneDataSet.groupBy(0) .sum(1); //步骤四:数据结果处理 result.writeAsText("D:\\kkb\\flinklesson\\src\\output\\result").setParallelism(1); //步骤五:启动程序 env.execute("word count"); } }
换一种写法
public class WordCount { public static void main(String[] args)throws Exception { //步骤一:获取离线的程序入口 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String inputPath="D:\\kkb\\flinklesson\\src\\main\\input\\hello.txt"; //步骤二:获取数据源 DataSource<String> dataSet = env.readTextFile(inputPath); //步骤三:数据处理 FlatMapOperator<String, Tuple2<String, Integer>> wordAndOneDataSet = dataSet.flatMap(new MySplitWordsTask()); AggregateOperator<Tuple2<String, Integer>> result = wordAndOneDataSet.groupBy(0) .sum(1); //步骤四:数据结果处理 result.writeAsText("D:\\kkb\\flinklesson\\src\\output\\result1").setParallelism(1); //步骤五:启动程序 env.execute("word count"); } public static class MySplitWordsTask implements FlatMapFunction<String,Tuple2<String,Integer>>{ @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] fileds = line.split(","); for (String word : fileds) { collector.collect(new Tuple2<String, Integer>(word, 1)); } } } }
总结:上面这种写法,我们把flink的算子抽离出来,代码看起来会更清晰。
为了方便我们的开发调试,Flink支持通过shell命令行的方式来对我们的代码进行开发运行,类似于Spark的shell命令行对代码的调试是一样的,可以方便的对我们的代码执行结果进行跟踪调试,查验代码的问题所在。
Flink shell方式支持流处理和批处理。当启动shell命令行之后,两个不同的ExecutionEnvironments会被自动创建。使用==senv(Stream)和benv(Batch)==分别去处理流处理和批处理程序。(类似于spark-shell中sc变量)
第一步:进入flink的scala-shell
node01执行以下命令进入scala-shell
cd /kkb/install/flink-1.8.1/
bin/start-scala-shell.sh local
或者我们也可以启动flink的集群,然后进入flink的shell客户端,将任务提交到flink集群上面去
cd /kkb/install/flink-1.8.1/
bin/start-scala-shell.sh remote node01 8081
第二步:使用benv变量执行批量处理
在scala-shell下,使用批处理来调试代码
val line =benv.fromElements("hello world","spark flink")
line.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1).print
通过senv变量实现代码
第一步:node01启动nc -lk 服务端
node01执行以下命令启动服务端
[hadoop@node01 ~]$ nc -lk 9000
第二步:进入scala-shell客户端
node01执行以下命令进入scala-shell
cd /kkb/install/flink-1.8.1/
bin/start-scala-shell.sh local
第三步:使用senv来统计单词出现次数
node01使用senv变量来实时统计单词出现的次数
senv.socketTextStream("node01",9000).flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1).print
senv.execute
第四步:node01发送单词
node01服务器发送单词
对于实时处理当中,我们实际工作当中的数据源一般都是使用kafka,flink提供了一个特有的kafka connector去读写kafka topic的数据。
flink消费kafka数据,并不是完全通过跟踪kafka消费组的offset来实现去保证exactly-once的语义,而是flink内部去跟踪offset和做checkpoint去实现exactly-once的语义,而且对于kafka的partition,Flink会启动对应的并行度去处理kafka当中的每个分区的数据
flink整合kafka官网介绍
导入jar包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency>
实际工作当中一般都是将kafka作为flink的source来使用
创建kafka的topic
安装好kafka集群,并启动kafka集群,然后在node01执行以下命令创建kafka的topic为test
cd /kkb/install/kafka_2.11-1.1.0
bin/kafka-topics.sh --create --partitions 3 --topic test --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181
代码实现:
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 object FlinkKafkaSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //隐式转换 import org.apache.flink.api.scala._ //checkpoint配置 env.enableCheckpointing(100); env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig.setCheckpointTimeout(60000); env.getCheckpointConfig.setMaxConcurrentCheckpoints(1); env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); val topic = "test" val prop = new Properties() prop.setProperty("bootstrap.servers","node01:9092") prop.setProperty("group.id","con1") prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); var kafkaSoruce: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop) kafkaSoruce.setCommitOffsetsOnCheckpoints(true) //设置statebackend env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka/checkpoints",true)); val result: DataStream[String] = env.addSource(kafkaSoruce) result.print() env.execute() } }
kafka生产数据
node01执行以下命令,通过shell命令行来生产数据到kafka当中去
cd /kkb/install/kafka_2.11-1.1.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
我们也可以将kafka作为flink的sink来使用,就是将flink处理完成之后的数据写入到kafka当中去
代码实现
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper object FlinkKafkaSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //隐式转换 import org.apache.flink.api.scala._ //checkpoint配置 env.enableCheckpointing(5000); env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig.setCheckpointTimeout(60000); env.getCheckpointConfig.setMaxConcurrentCheckpoints(1); env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true)); val text = env.socketTextStream("node01",9000) val topic = "test" val prop = new Properties() prop.setProperty("bootstrap.servers","node01:9092") prop.setProperty("group.id","kafka_group1") //第一种解决方案,设置FlinkKafkaProducer011里面的事务超时时间 //设置事务超时时间 prop.setProperty("transaction.timeout.ms",60000*15+""); //第二种解决方案,设置kafka的最大事务超时时间 //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema()); //使用支持仅一次语义的形式 val myProducer = new FlinkKafkaProducer011[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE) text.addSink(myProducer) env.execute("StreamingFromCollectionScala") } }
启动socket服务发送数据
node01执行以下命令,发送数据到socket服务里面去
nc -lk 9000
启动kafka消费者
node01执行以下命令启动kafka消费者,消费数据
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092 --topic test
Flink也可以直接与hbase进行集成,将hbase作为Flink的source和sink等
第一步:创建hbase表并插入数据
create 'hbasesource','f1'
put 'hbasesource','0001','f1:name','zhangsan'
put 'hbasesource','0002','f1:age','18'
第二步:导入整合jar包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <!-- 暂时没有1.8.1这个版本 --> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency>
第三步:开发flink代码
import org.apache.flink.addons.hbase.TableInputFormat import org.apache.flink.api.java.tuple import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes object FlinkReadHBase { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val hbaseData: DataSet[tuple.Tuple2[String, String]] = environment.createInput(new TableInputFormat[tuple.Tuple2[String, String]] { override def configure(parameters: Configuration): Unit = { val conf = HBaseConfiguration.create(); conf.set(HConstants.ZOOKEEPER_QUORUM, "node01,node02,node03") conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") val conn: Connection = ConnectionFactory.createConnection(conf) table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource"))) scan = new Scan() { // setStartRow(Bytes.toBytes("1001")) // setStopRow(Bytes.toBytes("1004")) addFamily(Bytes.toBytes("f1")) } } override def getScanner: Scan = { scan } override def getTableName: String = { "hbasesource" } override def mapResultToTuple(result: Result): tuple.Tuple2[String, String] = { val rowkey: String = Bytes.toString(result.getRow) val sb = new StringBuffer() for (cell: Cell <- result.rawCells()) { val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength) sb.append(value).append(",") } val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String, String] tuple2.setField(rowkey, 0) tuple2.setField(valueString, 1) tuple2 } }) hbaseData.print() environment.execute() } }
Flink也可以集成Hbase实现将数据写入到Hbase里面去
1.第一种:实现OutputFormat接口
2.第二种:继承RichSinkFunction重写父类方法
import java.util import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.scala.{ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes object FlinkWriteHBase { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceDataSet: DataSet[String] = environment.fromElements("01,zhangsan,28","02,lisi,30") sourceDataSet.output(new HBaseOutputFormat) environment.execute() } } class HBaseOutputFormat extends OutputFormat[String]{ val zkServer = "node01" val port = "2181" var conn: Connection = null override def configure(configuration: Configuration): Unit = { } override def open(i: Int, i1: Int): Unit = { val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create config.set(HConstants.ZOOKEEPER_QUORUM, zkServer) config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port) config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000) config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000) conn = ConnectionFactory.createConnection(config) } override def writeRecord(it: String): Unit = { val tableName: TableName = TableName.valueOf("hbasesource") val cf1 = "f1" val array: Array[String] = it.split(",") val put: Put = new Put(Bytes.toBytes(array(0))) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1))) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2))) val putList: util.ArrayList[Put] = new util.ArrayList[Put] putList.add(put) //设置缓存1m,当达到1m时数据会自动刷到hbase val params: BufferedMutatorParams = new BufferedMutatorParams(tableName) //设置缓存的大小 params.writeBufferSize(1024 * 1024) val mutator: BufferedMutator = conn.getBufferedMutator(params) mutator.mutate(putList) mutator.flush() putList.clear() } override def close(): Unit = { if(null != conn){ conn.close() } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。