当前位置:   article > 正文

Flink流计算学习 一_flink 如何进行计算

flink 如何进行计算

一、flink是什么?

Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,能够支持流处理和批处理两种应用类型。由于流处理和批处理所提供的SLA(服务等级协议)是完全不相同, 流处理一般需要支持低延迟、Exactly-once保证(恰好执行一次),而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。比较典型的有:实现批处理的开源方案有MapReduce、Spark;实现流处理的开源方案有Storm;Spark的Streaming 其实本质上也是微批处理。

二、使用步骤

flink中可以将本地文件,hadoop的hdfs,kafka等作为数据源,在这里我将用hadoop中的hdfs来作为数据源实现。

1.安装hadoop

我是在vmware中一台linux虚拟机来进行试验的,下面来描述具体过程。
首先在虚拟机中创建hadoop文件夹,并且下载并解压hadoop的jar包。

cd /home
mkdir hadoop
cd hadoop
wget http://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz
tar -xvf hadoop-2.8.3.tar.gz
  • 1
  • 2
  • 3
  • 4
  • 5

2.配置文件

配置文件的路径为:/home/hadoop/hadoop-2.8.3/etc/hadoop
core-site.xml

<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://192.168.1.11:9000</value>
    </property>
</configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

hdfs-site.xml

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

mapred-site.xml 解压出来的后面会多出template,删掉即可

<configuration>
    <property>
        <name>mapred.job.tracker</name>
        <value>192.168.1.11:9001</value>
    </property>
</configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

配置已经结束,接下来使用如下命令格式化Hadoop的文件系统HDFS

cd /home/hadoop/hadoop-2.8.3/bin
./hadoop namenode -format
  • 1
  • 2

接下来就可以去hadoop下的sbin目录,运行启动命令,将hadoop跑起来。

cd ../sbin
./start-all.sh
  • 1
  • 2

运行成功后,就可以通过一下地址访问hadoop与HDFS了。
http://192.168.1.11:8088 (MapReduce的Web页面)
http://192.168.1.11:50070 (HDFS的Web页面)

如果访问不了,就是端口没开放。

同时,vmware虚拟机的网络,最好选择桥接模式,这样重启虚拟机时,IP就不会频繁变动。
在这里插入图片描述

3.创建测试文件

计算要有数据源,所以需要去HDFS中创建一个文件,并且开启权限。

cd ../bin
hdfs dfs -touchz /wc.txt
echo "hello word flink oh oh" | ./hdfs dfs -appendToFile - /wc.txt 
./hdfs dfs -chmod -R 777 /
  • 1
  • 2
  • 3
  • 4

4.实现代码

这里是以scala语言编写的,关于IDEA怎么集成scala自行百度。

导入依赖

	<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins> <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution> <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

创建文件

在这里插入图片描述
在这里插入图片描述

实现代码

package source

import java.net.URL

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object FileSource {

  def main(args: Array[String]): Unit = {
    //初始化上下文
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
    //设置并行度(任务可以分散到几个slot运行)
    streamEnv.setParallelism(1);
    //有时代码提示出不来,可以在函数中导入,就会自动提示了
    import org.apache.flink.streaming.api.scala._
    //从hdfs上读取数据源
    val stream: DataStream[String] = streamEnv.readTextFile("hdfs://zjj1:9000/wc.txt")

    val result: DataStream[(String, Int)] = stream
      .flatMap(_.split(" "))//flatMap 读取到的数据按空格分割成为一个数组
      .map((_, 1))//数组中每个元素拆分为一个键值对 key为本身 value为1
      .keyBy(0)//通过key来分组 0是key 1是value
      .sum(1)//用下标为1(即value)的值来计算累加

    result.print();//打印结果

    //执行任务,流计算不执行,不会有结果
    streamEnv.execute("readHdfs");
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

文件中的内容:
通过hdfs dfs -cat /wc.txt来查看文件中的内容
在这里插入图片描述

输出结果:
在这里插入图片描述

三、结语

今天的学习就到这里了,加油!

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号