当前位置:   article > 正文

Flink入门编程_flink编程

flink编程

1、Flink编程入门案例

在这里插入图片描述
在这里插入图片描述

1.1、实时处理代码开发

(1)实现统计socket当中的单词数量

第一步:创建maven工程,导入jar包

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.8.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>1.8.1</version>
    </dependency>

</dependencies>
<build>
    <plugins>
        <!-- 限制jdk版本插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <!-- 编译scala需要用到的插件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <!-- 项目打包用到的插件 -->
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

第二步:开发flink代码统计socket当中的单词数量

开发flink代码实现接受socket单词数据,然后对数据进行统计

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

case class CountWord(word:String,count:Long)

object FlinkCount {

  def main(args: Array[String]): Unit = {
    //获取程序入口类
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //从socket当中获取数据
    val result: DataStream[String] = environment.socketTextStream("node01",9000)
    //导入隐式转换的包,否则时间不能使用
    import org.apache.flink.api.scala._
    //将数据进行切割,封装到样例类当中,然后进行统计
    val resultValue: DataStream[CountWord] = result
      .flatMap(x => x.split(" "))
      .map(x => CountWord(x,1))
      .keyBy("word")
     // .timeWindow(Time.seconds(1),Time.milliseconds(1))  按照每秒钟时间窗口,以及每秒钟滑动间隔来进行数据统计
      .sum("count")
    //打印最终输出结果
    resultValue.print().setParallelism(1)
    //启动服务
    environment.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

第三步:打包上传到服务器运行
将我们的程序打包,然后上传到服务器进行运行,将我们打包好的程序上传到node01服务器,然后体验在各种模式下进行运行我们的程序

1、standAlone模式运行程序
第一步:启动flink集群
node01执行以下命令启动flink集群

cd /kkb/install/flink-1.8.1
bin/start-cluster.sh
  • 1
  • 2

第二步:启动node01的socket服务,并提交flink任务
node01执行以下命令启动node01的socket服务

nc -lk 9000
  • 1

提交任务
将我们打包好的jar包上传到node01服务器的/kkb路径下,然后提交任务,注意,在pom.xml当中需要添加我们的打包插件,然后将任务代码进行打包,且集群已有的代码需要将打包scope设置为provided,在pom.xml将我们关于flink的jar包scope设置为provided打包,并将我们的jar-with-dependencies的jar包上传到node01服务器的/kkb路径下
在这里插入图片描述

node01执行以下命令提交任务

cd /kkb/install/flink-1.8.1/
bin/flink run  --class com.kkb.flink.demo1.FlinkCount /kkb/flink_day01-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 1
  • 2

第三步:查询运行结果
node01查看运行结果

cd /kkb/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-1-node01.kaikeba.com.out
  • 1
  • 2

注意:结果保存在以.out结尾的文件当中,哪个文件当中有数据,就查看哪个文件即可

(2)实时统计每隔1秒统计最近2秒单词出现的次数

pom依赖如下:

    <properties>
        <flink.version>1.9.0</flink.version>
        <scala.version>2.11.8</scala.version>
    </properties>

    <dependencies>
        <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

代码开发(java)

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCountJava {
    public static void main(String[] args) throws Exception {
        //flink提供的工具类,获取传递的参数
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostname = parameterTool.get("hostname");
        int port = parameterTool.getInt("port");
        //步骤一:获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //步骤二:获取数据源
        DataStream<String> dataStream = env.socketTextStream(hostname, port);
        //步骤三:执行逻辑操作
        DataStream<WordCount> wordAndOneStream = dataStream.flatMap(new FlatMapFunction<String, WordCount>() {
            public void flatMap(String line, Collector<WordCount> out) {
                String[] fields = line.split(",");
                for (String word : fields) {
                    out.collect(new WordCount(word, 1L));
                }
            }
        });

        DataStream<WordCount> resultStream = wordAndOneStream.keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//每隔1秒计算最近2秒
                .sum("count");
        //步骤四:结果打印
        resultStream.print();
        //步骤五:任务启动
        env.execute("WindowWordCountJava");
    }

    public static class WordCount{
        public String word;
        public long count;
        //记得要有这个空构建
        public WordCount(){

        }
        public WordCount(String word,long count){
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

代码开发(scala

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * 滑动窗口
  * 每隔1秒钟统计最近2秒内的数据,打印到控制台。
  */
object WindowWordCountScala {
  def main(args: Array[String]): Unit = {
    //获取参数
    val hostname = ParameterTool.fromArgs(args).get("hostname")
    val port = ParameterTool.fromArgs(args).getInt("port")
    //TODO 导入隐式转换
    import org.apache.flink.api.scala._
    //步骤一:获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //步骤二:获取数据源
    val textStream = env.socketTextStream(hostname,port)
    //步骤三:数据处理
    val wordCountStream = textStream.flatMap(line => line.split(","))
      .map((_, 1))
      .keyBy(0)
      .timeWindow(Time.seconds(2), Time.seconds(1))
      .sum(1)
    //步骤四:数据结果处理
    wordCountStream.print()
    //步骤六:启动程序
    env.execute("WindowWordCountScala")
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

1.2、离线批量处理代码开发

flink也可以通过批量处理代码来实现批量数据处理

需求:处理附件中的count.txt文件,实现单词计数统计

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment}

object BatchOperate {
  def main(args: Array[String]): Unit = {

    val inputPath = "D:\\count.txt"
    val outPut = "D:\\data\\result2"

    //获取程序入口类ExecutionEnvironment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile(inputPath)

    //引入隐式转换
    import org.apache.flink.api.scala._

    val value: AggregateDataSet[(String, Int)] = text.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
    value.writeAsText("d:\\datas\\result.txt").setParallelism(1)

    env.execute("batch word count")
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

离线代码开发(java)

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args)throws  Exception {
        //步骤一:获取离线的程序入口
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String inputPath="D:\\kkb\\flinklesson\\src\\main\\input\\hello.txt";
        //步骤二:获取数据源
        DataSource<String> dataSet = env.readTextFile(inputPath);
        //步骤三:数据处理
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOneDataSet = dataSet.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fileds = line.split(",");
                for (String word : fileds) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        });

        AggregateOperator<Tuple2<String, Integer>> result = wordAndOneDataSet.groupBy(0)
                .sum(1);
        //步骤四:数据结果处理
        result.writeAsText("D:\\kkb\\flinklesson\\src\\output\\result").setParallelism(1);
        //步骤五:启动程序
        env.execute("word count");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

换一种写法

public class WordCount {
    public static void main(String[] args)throws  Exception {
        //步骤一:获取离线的程序入口
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String inputPath="D:\\kkb\\flinklesson\\src\\main\\input\\hello.txt";
        //步骤二:获取数据源
        DataSource<String> dataSet = env.readTextFile(inputPath);
        //步骤三:数据处理
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOneDataSet = dataSet.flatMap(new MySplitWordsTask());

        AggregateOperator<Tuple2<String, Integer>> result = wordAndOneDataSet.groupBy(0)
                .sum(1);
        //步骤四:数据结果处理
        result.writeAsText("D:\\kkb\\flinklesson\\src\\output\\result1").setParallelism(1);
        //步骤五:启动程序
        env.execute("word count");
    }


    public static class MySplitWordsTask implements   FlatMapFunction<String,Tuple2<String,Integer>>{
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] fileds = line.split(",");
            for (String word : fileds) {
                collector.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

总结:上面这种写法,我们把flink的算子抽离出来,代码看起来会更清晰。

2、Flink的shell命令行代码调试

为了方便我们的开发调试,Flink支持通过shell命令行的方式来对我们的代码进行开发运行,类似于Spark的shell命令行对代码的调试是一样的,可以方便的对我们的代码执行结果进行跟踪调试,查验代码的问题所在。
Flink shell方式支持流处理和批处理。当启动shell命令行之后,两个不同的ExecutionEnvironments会被自动创建。使用==senv(Stream)benv(Batch)==分别去处理流处理和批处理程序。(类似于spark-shell中sc变量)

2.1、批量处理代码调试

第一步:进入flink的scala-shell
node01执行以下命令进入scala-shell

cd /kkb/install/flink-1.8.1/
bin/start-scala-shell.sh local
  • 1
  • 2

或者我们也可以启动flink的集群,然后进入flink的shell客户端,将任务提交到flink集群上面去

cd /kkb/install/flink-1.8.1/
bin/start-scala-shell.sh remote node01 8081
  • 1
  • 2

第二步:使用benv变量执行批量处理
在scala-shell下,使用批处理来调试代码

val line =benv.fromElements("hello world","spark flink")
line.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1).print
  • 1
  • 2

2.2、实时处理代码调试

通过senv变量实现代码

第一步:node01启动nc -lk 服务端
node01执行以下命令启动服务端

[hadoop@node01 ~]$ nc -lk 9000
  • 1

第二步:进入scala-shell客户端
node01执行以下命令进入scala-shell

cd /kkb/install/flink-1.8.1/
bin/start-scala-shell.sh local
  • 1
  • 2

第三步:使用senv来统计单词出现次数
node01使用senv变量来实时统计单词出现的次数

senv.socketTextStream("node01",9000).flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1).print
senv.execute
  • 1
  • 2

第四步:node01发送单词
node01服务器发送单词

3、Flink之DataStream集成kafka

对于实时处理当中,我们实际工作当中的数据源一般都是使用kafka,flink提供了一个特有的kafka connector去读写kafka topic的数据。

flink消费kafka数据,并不是完全通过跟踪kafka消费组的offset来实现去保证exactly-once的语义,而是flink内部去跟踪offset和做checkpoint实现exactly-once的语义,而且对于kafka的partition,Flink会启动对应的并行度去处理kafka当中的每个分区的数据

flink整合kafka官网介绍

导入jar包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3.1、将kafka作为flink的source

实际工作当中一般都是将kafka作为flink的source来使用

创建kafka的topic
安装好kafka集群,并启动kafka集群,然后在node01执行以下命令创建kafka的topic为test

cd /kkb/install/kafka_2.11-1.1.0
bin/kafka-topics.sh --create --partitions 3 --topic test --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181
  • 1
  • 2

代码实现:

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

object FlinkKafkaSource {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //隐式转换
    import org.apache.flink.api.scala._
    //checkpoint配置
    env.enableCheckpointing(100);
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
    env.getCheckpointConfig.setCheckpointTimeout(60000);
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    val topic = "test"
    val prop = new Properties()
    prop.setProperty("bootstrap.servers","node01:9092")
    prop.setProperty("group.id","con1")
    prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    var kafkaSoruce: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)

    kafkaSoruce.setCommitOffsetsOnCheckpoints(true)
    //设置statebackend
    env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka/checkpoints",true));
    val result: DataStream[String] = env.addSource(kafkaSoruce)
    result.print()
    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

kafka生产数据

node01执行以下命令,通过shell命令行来生产数据到kafka当中去

cd /kkb/install/kafka_2.11-1.1.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic  test
  • 1
  • 2

3.2、将kafka作为flink的sink

我们也可以将kafka作为flink的sink来使用,就是将flink处理完成之后的数据写入到kafka当中去
代码实现

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper

object FlinkKafkaSink {
  def main(args: Array[String]): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      //隐式转换
      import org.apache.flink.api.scala._
      //checkpoint配置
      env.enableCheckpointing(5000);
      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
      env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
      env.getCheckpointConfig.setCheckpointTimeout(60000);
      env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
      env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
      //设置statebackend
      env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));


      val text = env.socketTextStream("node01",9000)

      val topic = "test"
      val prop = new Properties()
      prop.setProperty("bootstrap.servers","node01:9092")
      prop.setProperty("group.id","kafka_group1")


      //第一种解决方案,设置FlinkKafkaProducer011里面的事务超时时间
      //设置事务超时时间
      prop.setProperty("transaction.timeout.ms",60000*15+"");
      //第二种解决方案,设置kafka的最大事务超时时间
      //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());
      //使用支持仅一次语义的形式
      val myProducer = new FlinkKafkaProducer011[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)
      text.addSink(myProducer)
      env.execute("StreamingFromCollectionScala")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

启动socket服务发送数据
node01执行以下命令,发送数据到socket服务里面去

nc -lk  9000
  • 1

启动kafka消费者
node01执行以下命令启动kafka消费者,消费数据

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092 --topic test
  • 1

4、Flink之DataSet集成Hbase

Flink也可以直接与hbase进行集成,将hbase作为Flink的source和sink等

第一步:创建hbase表并插入数据

create 'hbasesource','f1'
put 'hbasesource','0001','f1:name','zhangsan'
put 'hbasesource','0002','f1:age','18'
  • 1
  • 2
  • 3

第二步:导入整合jar包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-hadoop2</artifactId>
<!-- 暂时没有1.8.1这个版本 -->
    <version>1.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hbase_2.11</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.0-cdh5.14.2</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.2.0-cdh5.14.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

第三步:开发flink代码

4.1、flink集成hbase读取hbase数据

import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.java.tuple
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes

object FlinkReadHBase {
  def main(args: Array[String]): Unit = {
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    val hbaseData: DataSet[tuple.Tuple2[String, String]] = environment.createInput(new TableInputFormat[tuple.Tuple2[String, String]] {
      override def configure(parameters: Configuration): Unit = {
        val conf = HBaseConfiguration.create();
        conf.set(HConstants.ZOOKEEPER_QUORUM, "node01,node02,node03")
        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
        val conn: Connection = ConnectionFactory.createConnection(conf)
        table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource")))
        scan = new Scan() {
          // setStartRow(Bytes.toBytes("1001"))
          // setStopRow(Bytes.toBytes("1004"))
          addFamily(Bytes.toBytes("f1"))
        }
      }
      override def getScanner: Scan = {
        scan
      }
      override def getTableName: String = {
        "hbasesource"
      }
      override def mapResultToTuple(result: Result): tuple.Tuple2[String, String] = {
        val rowkey: String = Bytes.toString(result.getRow)
        val sb = new StringBuffer()
        for (cell: Cell <- result.rawCells()) {
          val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
          sb.append(value).append(",")
        }
        val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString
        val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String, String]
        tuple2.setField(rowkey, 0)
        tuple2.setField(valueString, 1)
        tuple2
      }


    })
    hbaseData.print()
    environment.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

4.2、Flink读取数据,然后写入hbase

Flink也可以集成Hbase实现将数据写入到Hbase里面去

1.第一种:实现OutputFormat接口
2.第二种:继承RichSinkFunction重写父类方法

import java.util
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes

object FlinkWriteHBase {
  def main(args: Array[String]): Unit = {
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val sourceDataSet: DataSet[String] = environment.fromElements("01,zhangsan,28","02,lisi,30")
    sourceDataSet.output(new HBaseOutputFormat)
    environment.execute()
  }
}

class HBaseOutputFormat extends OutputFormat[String]{
  val zkServer = "node01"
  val port = "2181"
  var conn: Connection = null

  override def configure(configuration: Configuration): Unit = {

  }

  override def open(i: Int, i1: Int): Unit = {
    val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create
    config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)
    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)
    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
    conn = ConnectionFactory.createConnection(config)
  }

  override def writeRecord(it: String): Unit = {
    val tableName: TableName = TableName.valueOf("hbasesource")
    val cf1 = "f1"
    val array: Array[String] = it.split(",")
    val put: Put = new Put(Bytes.toBytes(array(0)))
    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))
    val putList: util.ArrayList[Put] = new util.ArrayList[Put]
    putList.add(put)
    //设置缓存1m,当达到1m时数据会自动刷到hbase
    val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
    //设置缓存的大小
    params.writeBufferSize(1024 * 1024)
    val mutator: BufferedMutator = conn.getBufferedMutator(params)
    mutator.mutate(putList)
    mutator.flush()
    putList.clear()
  }
  override def close(): Unit = {
    if(null != conn){
      conn.close()
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/751949
推荐阅读
相关标签
  

闽ICP备14008679号