赞
踩
虚拟机(我)开启三台,需要启动hadoop、zk以及flink
启动命令和展示结果如下
start-all.sh
zkServer.sh start
start-cluster.sh
public class MyWordCount1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lineds = env.readTextFile("/flinkPro/workCount.txt");
SingleOutputStreamOperator<Tuple2<String, Long>> wordstream = lineds.flatMap
((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
KeyedStream<Tuple2<String, Long>, String> groupds = wordstream.keyBy(ds -> ds.f0);
groupds.sum(1).print();
env.execute();
}
//.setParallelism()
}
@RequestMapping(value = "/uploadToHdfs",method = RequestMethod.POST)
@ApiOperation(value = "hdfs文件上传")
@ApiImplicitParams({
@ApiImplicitParam(name = "file",value = "file")
})
public ResponseEntity<ApiResponse> uploadToHdfs(@RequestParam MultipartFile file) throws Exception {
String originalFilename = file.getOriginalFilename(); //文件名
boolean res = fileService.uploadHdfs(file, originalFilename);
return response(res);
}
@Service
@Slf4j
public class FileServiceImpl implements FileService {
@Autowired
private HdfsUtil hdfsUtil;
@Autowired
private HdfsConfig hdfsConfig;
/**
* @param file 前端传过来的文件
* @param fileName 文件名
* @return
*/
@Override
public boolean uploadHdfs(MultipartFile file, String fileName) {
boolean res = false;
try {
String hdfsPath = hdfsConfig.getNameNodeUrl();
hdfsUtil.createFile(hdfsPath+fileName, file, fileName);
res = hdfsUtil.existFile(hdfsConfig.getHdfsPath() + fileName);
} catch (Exception e) {
log.error("文件有错误:{}",e.getMessage());
}
return res;
}
}
@Configuration
@Data
@NoArgsConstructor
@AllArgsConstructor
public class HdfsConfig {
// hdfs nameNode连接URL
@Value("${nameNode.url}")
private String nameNodeUrl;
// 操作用户
@Value("${hdfs.userName}")
private String hdfsUserName;
// 操作存储节点路径
@Value("${hdfs.dataNode}/")
private String pdfDataNode;
//hdfs存储路径
@Value("${nameNode.hdfsPath}")
private String hdfsPath;
@Value("${user.root}")
private String userRoot;
@Value("${hadoop.name}")
private String hadoopName;
}
```c
application.yml配置:
nameNode:
url: hdfs://ip:端口/hdfs/WangAndLi/
hdfsPath: / #hdfs存储路径
hdfs:
userName: root
dataNode: ip:端口 #操作存储节点路径
#设置访问路径
user:
root: /hdfs/WangAndLi
hadoop:
name: fs.defaultFS
security:
user: root
进入到flink的bin目录下
./flink run class com.xxx.flink.wordCount.MyWordCount1 /jar
其中”/jar“是你的hdfs路径
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。