赞
踩
flink程序的数据流api处理:
一个Flink程序就是对DataStream API的各种转换。分为几部分:
1.获取执行环境(esecution environment)
2.读取数据源(source)
3.定义基于数据的转换操作(transformations)
4.定义计算结果的输出位置(sink)
5.触发程序执行(execute)
//获取执行环境是StreamExecutionEnvironment类的对象
三种方法:
1.直接调用getExecutionEnvironment方法,根据运行方式返回运行环境
2.createLocalEnvironment方法,传入并行度参数,默认为CPU核心数
3.createRemoteEnvironment返回集群执行环境,指定jobmanager的主机名和端口号,指定jar包
bin/flink run -Dexecution.runtime-mode=BATCH ...
Flink事件驱动的(Lazy exection):显式调用execute()方法,触发程序执行,完成后返回jobExecutionResult.
Flink 代码中通用的添加 source 的方式,是调用执行环境的 addSource()方法:
DataStream<String> stream = env.addSource(...);
方法传入一个对象参数,需要实现 SourceFunction 接口;返回 DataStreamSource。这里的
DataStreamSource 类继承自 SingleOutputStreamOperator 类,又进一步继承自 DataStream。所以
很明显,读取数据的 source 操作是一个算子,得到的是一个数据流(DataStream)
最简单的读取数据的方式,就是在代码中直接创建一个Java 集合,然后调用执行环境的fromCollection 方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。
代码如下(示例):
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
ArrayList<Event> clicks = new ArrayList<>();
clicks.add(new Event("Mary","./home",1000L));
clicks.add(new Event("Bob","./cart",2000L));
DataStream<Event> stream = env.fromCollection(clicks);
stream.print();
env.execute();
}
DataStream<String> stream = env.readTextFile("clicks.csv");
吞吐量小,稳定性差,一般用于测试
DataStream<String> stream = env.socketTextStream("localhost", 7777);
Kafka 的连接比较复杂,Flink 内部并没有提供预实现的方法。所以我们只能采用通用的addSource 方式、实现一个 SourceFunction 了;Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的SourceFunction
依赖
:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
然后env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class SourceKafkaTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop102:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>( "clicks", new SimpleStringSchema(), properties )); stream.print("Kafka"); env.execute(); } }
创建 FlinkKafkaConsumer 时需要传入三个参数:
1.第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic
列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据
时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。
2. 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消
息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中
使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数
组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是
公共接口,所以我们也可以自定义反序列化逻辑。
3.第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性
1.定义数据源
import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Calendar; import java.util.Random; public class ClickSource implements SourceFunction<Event> { // 声明一个布尔变量,作为控制数据生成的标识位 private Boolean running = true; @Override public void run(SourceContext<Event> ctx) throws Exception { Random random = new Random(); // 在指定的数据集中随机选取数据 78 String[] users = {"Mary", "Alice", "Bob", "Cary"}; String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"}; while (running) { ctx.collect(new Event( users[random.nextInt(users.length)], urls[random.nextInt(urls.length)], Calendar.getInstance().getTimeInMillis() )); // 隔 1 秒生成一个点击事件,方便观测 Thread.sleep(1000); } } @Override public void cancel() { running = false; } }
2.调用addsource
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SourceCustom {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//有了自定义的 source function,调用 addSource 方法
DataStreamSource<Event> stream = env.addSource(new ClickSource());
stream.print("SourceCustom");
env.execute();
}
}
DataStream API的前两步操作:执行环境和数据源的内容(转换算子和输出算子下期见)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。