当前位置:   article > 正文

flink的源码是java还是scala_Flink入门(一)(Java和scala)

flink源码是java写的还是scala写的

Flink批处理

从文件中读取单词,计算频次

Scala版本

import org.apache.flink.api.scala.ExecutionEnvironment

object BatchWordCountJob {

def main(args: Array[String]) {

val env = ExecutionEnvironment.getExecutionEnvironment

val input ="/Users/mac126/Desktop/hello.txt";

val text = env.readTextFile(input)

import org.apache.flink.api.scala._

text.flatMap(_.toLowerCase().split("\t")).filter(_.nonEmpty).map((_,1))

.groupBy(0).sum(1).print()

}

最后可在控制台看到打印的内容

Java版本

mport org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;

/**

* java开发wordcount

*/

public class BatchWordCountJob {

public static void main(String[] args) throws Exception{

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

String file="/Users/mac126/Desktop/hello.txt";

DataSettext = env.readTextFile(file);

text.flatMap(new FlatMapFunction>() {

@Override

public void flatMap(String value, Collector> collector) throws Exception {

String [] tokens = value.toLowerCase().split("\t");

for (String token :tokens){

if (token.trim().length()>0){

collector.collect(new Tuple2(token,1));

}

}

}

}).groupBy(0).sum(1).print();

}

}

Flink流处理

flink入门一般使用socket数据作为流式数据来源。在mac上命令行使用nc -lk 9000开启socket发送端,在flink中接收socket消息如下

Scala版本

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.streaming.api.windowing.time.Time

object StreamWordCountJob {

def main(args: Array[String]) {

// set up the streaming execution environment

val env = StreamExecutionEnvironment.getExecutionEnvironment

val text =env.socketTextStream("localhost",9000)

import org.apache.flink.api.scala._

text.flatMap(_.split(",")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1)

env.execute("StreamWordCountJob")

}

}

Java版本

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

public class StreamWordCountJob {

public static void main(String[] args) throws Exception{

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSourcetext = env.socketTextStream("localhost",9000);

text.flatMap(new FlatMapFunction>() {

@Override

public void flatMap(String value, Collector> collector) throws Exception {

String [] tokens = value.toLowerCase().split(",");

for (String token:tokens){

if (token.length()>0){

collector.collect(new Tuple2(token,1));

}

}

}

}).keyBy(0).sum(1).print().setParallelism(1);

env.execute("StreamWordCountJob");

}

}

以上就是Flink入门的第一个小例子。大家可以在自己电脑上试下。

前置pom文件

一般在IDEA上安装Scala插件进行开发

scala版本 pom文件

4.0.0

my.flink.quickstart

quickstart

0.1

jar

Flink Quickstart Job

http://www.myorganization.org

apache.snapshots

Apache Development Snapshot Repository

https://repository.apache.org/content/repositories/snapshots/

false

true

UTF-8

1.8.0

2.11

2.11.12

org.apache.flink

flink-scala_${scala.binary.version}

${flink.version}

provided

org.apache.flink

flink-streaming-scala_${scala.binary.version}

${flink.version}

provided

org.scala-lang

scala-library

${scala.version}

provided

org.slf4j

slf4j-log4j12

1.7.7

runtime

log4j

log4j

1.2.17

runtime

org.apache.maven.plugins

maven-shade-plugin

3.0.0

package

shade

org.apache.flink:force-shading

com.google.code.findbugs:jsr305

org.slf4j:*

log4j:*

*:*

META-INF/*.SF

META-INF/*.DSA

META-INF/*.RSA

my.flink.quickstart.StreamingJob

org.apache.maven.plugins

maven-compiler-plugin

3.1

1.8

1.8

net.alchim31.maven

scala-maven-plugin

3.2.2

compile

testCompile

org.apache.maven.plugins

maven-eclipse-plugin

2.8

true

org.scala-ide.sdt.core.scalanature

org.eclipse.jdt.core.javanature

org.scala-ide.sdt.core.scalabuilder

org.scala-ide.sdt.launching.SCALA_CONTAINER

org.eclipse.jdt.launching.JRE_CONTAINER

org.scala-lang:scala-library

org.scala-lang:scala-compiler

**/*.scala

**/*.java

org.codehaus.mojo

build-helper-maven-plugin

1.7

add-source

generate-sources

add-source

src/main/scala

add-test-source

generate-test-sources

add-test-source

src/test/scala

add-dependencies-for-IDEA

idea.version

org.apache.flink

flink-scala_${scala.binary.version}

${flink.version}

compile

org.apache.flink

flink-streaming-scala_${scala.binary.version}

${flink.version}

compile

org.scala-lang

scala-library

${scala.version}

compile

Java版本pom文件为

4.0.0

my-flink-project

my-flink-project

0.1

jar

Flink Quickstart Job

http://www.myorganization.org

UTF-8

1.6.2

1.8

2.11

${java.version}

${java.version}

apache.snapshots

Apache Development Snapshot Repository

https://repository.apache.org/content/repositories/snapshots/

false

true

org.apache.flink

flink-java

${flink.version}

provided

org.apache.flink

flink-streaming-java_${scala.binary.version}

${flink.version}

provided

org.slf4j

slf4j-log4j12

1.7.7

runtime

log4j

log4j

1.2.17

runtime

org.apache.maven.plugins

maven-compiler-plugin

3.1

${java.version}

${java.version}

org.apache.maven.plugins

maven-shade-plugin

3.0.0

package

shade

org.apache.flink:force-shading

com.google.code.findbugs:jsr305

org.slf4j:*

log4j:*

*:*

META-INF/*.SF

META-INF/*.DSA

META-INF/*.RSA

myflink.StreamingJob

org.eclipse.m2e

lifecycle-mapping

1.0.0

org.apache.maven.plugins

maven-shade-plugin

[3.0.0,)

shade

org.apache.maven.plugins

maven-compiler-plugin

[3.1,)

testCompile

compile

add-dependencies-for-IDEA

idea.version

org.apache.flink

flink-java

${flink.version}

compile

org.apache.flink

flink-streaming-java_${scala.binary.version}

${flink.version}

compile

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/972463
推荐阅读
相关标签
  

闽ICP备14008679号