赞
踩
简介: Flink 和 Spark 是两个主流的大数据处理框架,但它们在数据处理模型、执行引擎和使用场景上有着不同的特点。本文将深入比较 Flink 和 Spark,以及它们的适用场景,并结合代码示例说明它们的用法和优劣势。
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.util.Collector;
-
- public class FlinkWordCount {
-
- public static void main(String[] args) throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> text = env.fromElements(
- "To be, or not to be, that is the question",
- "Whether 'tis nobler in the mind to suffer",
- "The slings and arrows of outrageous fortune",
- "Or to take arms against a sea of troubles"
- );
-
- DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer())
- .groupBy(0)
- .sum(1);
-
- counts.print();
- }
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- String[] tokens = value.toLowerCase().split("\\W+");
-
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<>(token, 1));
- }
- }
- }
- }
- }
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.sql.SparkSession;
-
- import java.util.Arrays;
-
- public class SparkWordCount {
-
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("SparkWordCount")
- .master("local[*]")
- .getOrCreate();
-
- JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
-
- JavaRDD<String> lines = jsc.parallelize(Arrays.asList(
- "To be, or not to be, that is the question",
- "Whether 'tis nobler in the mind to suffer",
- "The slings and arrows of outrageous fortune",
- "Or to take arms against a sea of troubles"
- ));
-
- JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.toLowerCase().split("\\W+")).iterator());
-
- JavaRDD<String> filteredWords = words.filter(word -> word.length() > 0);
-
- JavaRDD<String> wordCounts = filteredWords.mapToPair(word -> new Tuple2<>(word, 1))
- .reduceByKey(Integer::sum)
- .map(tuple -> tuple._1 + ": " + tuple._2);
-
- wordCounts.foreach(System.out::println);
-
- spark.stop();
- }
- }
当涉及到 FlinkSQL 和 SparkSQL 时,两者提供了一种以 SQL 语言进行数据查询和处理的方式。以下是分别在 Flink 和 Spark 中使用 SQL 进行简单数据处理的示例代码:
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
- public class FlinkSQLExample {
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-
- // 创建一个DataStream(假设数据源为Kafka)
- DataStream<Order> orders = env.fromElements(
- new Order(1, "product-1", 100),
- new Order(2, "product-2", 150),
- new Order(3, "product-3", 200)
- );
-
- // 注册DataStream为表
- tableEnv.createTemporaryView("Orders", orders, "orderId, productName, amount");
-
- // 使用SQL进行查询
- Table result = tableEnv.sqlQuery("SELECT productName, SUM(amount) as totalAmount FROM Orders GROUP BY productName");
-
- // 将结果转换为DataStream并打印输出
- tableEnv.toRetractStream(result, Types.TUPLE(Types.STRING, Types.LONG))
- .print();
-
- env.execute("Flink SQL Example");
- }
-
- // 订单类
- public static class Order {
- public int orderId;
- public String productName;
- public int amount;
-
- public Order(int orderId, String productName, int amount) {
- this.orderId = orderId;
- this.productName = productName;
- this.amount = amount;
- }
- }
- }
- import org.apache.spark.sql.*;
-
- public class SparkSQLExample {
-
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("SparkSQLExample")
- .master("local[*]")
- .getOrCreate();
-
- // 创建一个DataFrame
- Dataset<Row> df = spark.createDataFrame(
- spark.sparkContext().parallelize(
- RowFactory.create(1, "product-1", 100),
- RowFactory.create(2, "product-2", 150),
- RowFactory.create(3, "product-3", 200)
- ),
- DataTypes.createStructType(new StructField[]{
- DataTypes.createStructField("orderId", DataTypes.IntegerType, false),
- DataTypes.createStructField("productName", DataTypes.StringType, false),
- DataTypes.createStructField("amount", DataTypes.IntegerType, false)
- })
- );
-
- // 创建一个临时视图
- df.createOrReplaceTempView("Orders");
-
- // 使用SQL进行查询
- Dataset<Row> result = spark.sql("SELECT productName, SUM(amount) as totalAmount FROM Orders GROUP BY productName");
-
- // 展示结果
- result.show();
-
- spark.stop();
- }
- }
在项目中我们会经常使用sparkSQL针对离线数据进行统计汇总,做一些概览数据的呈现功能。
Flink 和 Spark 都是强大的大数据处理框架,各自有着独特的特点和适用场景。通过本文的比较,可以更深入地了解它们,并根据自身需求选择适合的框架来处理数据。掌握两者的优劣势有助于更好地应用于大数据处理和实时计算场景。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。