当前位置:   article > 正文

搭建开发flink程序的流程_如何在机器上搭建flink

如何在机器上搭建flink

1、启动虚拟机

虚拟机(我)开启三台,需要启动hadoop、zk以及flink
启动命令和展示结果如下

1.1 启动hadoop的hdfs和yarn

start-all.sh
  • 1

1.2 启动zk

zkServer.sh start
  • 1

1.3 启动flink

start-cluster.sh
  • 1

1.4 最终的展示启动效果

在这里插入图片描述

2.开发flink程序

public class MyWordCount1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lineds = env.readTextFile("/flinkPro/workCount.txt");
        SingleOutputStreamOperator<Tuple2<String, Long>> wordstream = lineds.flatMap
                ((String line, Collector<Tuple2<String, Long>> out) -> {
                    String[] words = line.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1L));
                    }
                })
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        KeyedStream<Tuple2<String, Long>, String> groupds = wordstream.keyBy(ds -> ds.f0);
        groupds.sum(1).print();
        env.execute();
    }
//.setParallelism()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3.将/flinkPro/workCount.txt文件上传到hdfs

3.1 controller代码

  @RequestMapping(value = "/uploadToHdfs",method = RequestMethod.POST)
    @ApiOperation(value = "hdfs文件上传")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "file",value = "file")
    })
    public ResponseEntity<ApiResponse> uploadToHdfs(@RequestParam MultipartFile file) throws Exception {
        String originalFilename = file.getOriginalFilename(); //文件名
        boolean res = fileService.uploadHdfs(file, originalFilename);
        return response(res);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3.2 serviceImpl代码

@Service
@Slf4j
public class FileServiceImpl implements FileService {

    @Autowired
    private HdfsUtil hdfsUtil;
    @Autowired
    private HdfsConfig hdfsConfig;

    /**
     * @param file     前端传过来的文件
     * @param fileName 文件名
     * @return
     */
    @Override
    public boolean uploadHdfs(MultipartFile file, String fileName)  {
        boolean res = false;
        try {
            String hdfsPath = hdfsConfig.getNameNodeUrl();
            hdfsUtil.createFile(hdfsPath+fileName, file, fileName);
            res = hdfsUtil.existFile(hdfsConfig.getHdfsPath() + fileName);
        } catch (Exception e) {
            log.error("文件有错误:{}",e.getMessage());
        }

        return res;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

3.3 HdfsConfig代码

@Configuration
@Data
@NoArgsConstructor
@AllArgsConstructor
public class HdfsConfig {
    // hdfs nameNode连接URL
    @Value("${nameNode.url}")
    private String nameNodeUrl;
 
    // 操作用户
    @Value("${hdfs.userName}")
    private String hdfsUserName;
 
    // 操作存储节点路径
    @Value("${hdfs.dataNode}/")
    private String pdfDataNode;
 
    //hdfs存储路径
    @Value("${nameNode.hdfsPath}")
    private String hdfsPath;
 
    @Value("${user.root}")
    private String userRoot;

    @Value("${hadoop.name}")
    private String hadoopName;

}

```c
application.yml配置:

nameNode:
  url: hdfs://ip:端口/hdfs/WangAndLi/
  hdfsPath: /       #hdfs存储路径
hdfs:
  userName: root
  dataNode: ip:端口    #操作存储节点路径
  #设置访问路径
user:
  root: /hdfs/WangAndLi
hadoop:
  name: fs.defaultFS
security:
  user: root
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

4.打包项目jar包,同样上传到hdfs

5.执行程序

进入到flink的bin目录下
./flink run class com.xxx.flink.wordCount.MyWordCount1  /jar
其中”/jar“是你的hdfs路径
  • 1
  • 2
  • 3
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号