赞
踩
1.19.10.Flink SQL工程案例
1.19.10.1.编写Pom.xml文件
1.19.10.2.java案例
1.19.10.3.案例1:Flink批式处理(建表、查询、插入、jdbc connector的使用)
1.19.10.4.案例2:StreamTableEnvironment的使用案例+数据模拟+自定义函数及其应用
1.19.10.5.案例3:StreamTableEnvironment应用+blink/Flink SQL应用
1.19.10.6.案例4:Flink SQL之Stream + Window窗口计算案例
1.19.10.7.案例5:将DataSet数据转成Table数据(word count)
1.19.10.8.案例6:将Table数据转成DataSet
1.19.10.11.模拟表数据+自定义函数
1.19.10.12.Stream SQL + blink/flink SQL切换使用案例
1.19.10.13.StreamTableExample案例
1.19.10.14.TPCHQuery3Table
1.19.10.15.WordCountSQL
1.19.10.16.WordCountTable
1.19.10.17.SQL案例
1.19.10.18.Flink之单流kafka写入mysql
1.19.10.19.Flink之双流kafka写入mysql
1.19.10.20.Flink之kafka和mysql维表实时关联写入mysql
1.19.10.21.Flink滚动窗口案例
1.19.10.22.Flink滑动窗口案例
1.19.10.23.Mysql-cdc,ElasticSearch connector
1.19.10.24.Flink datagen案例
1.19.10.25.Upsert Kafka SQL连接器
1.19.10.26.Flink Elasticsearch connector相关的案例
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.toto.test</groupId> <artifactId>flink-sql-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!--maven properties --> <maven.test.skip>false</maven.test.skip> <maven.javadoc.skip>false</maven.javadoc.skip> <!-- compiler settings properties --> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <flink.version>1.12.0</flink.version> <commons-lang.version>2.5</commons-lang.version> <scala.binary.version>2.11</scala.binary.version> </properties> <distributionManagement> <repository> <id>releases</id> <layout>default</layout> <url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url> </repository> <snapshotRepository> <id>snapshots</id> <name>snapshots</name> <url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url> </snapshotRepository> </distributionManagement> <repositories> <repository> <id>releases</id> <layout>default</layout> <url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url> </repository> <repository> <id>snapshots</id> <name>snapshots</name> <url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url> <snapshots> <enabled>true</enabled> <updatePolicy>always</updatePolicy> <checksumPolicy>warn</checksumPolicy> </snapshots> </repository> <repository> <id>xxxx</id> <name>xxxx</name> <url>http://xxx.xxx.xxx/nexus/content/repositories/xxxx/</url> </repository> <repository> <id>public</id> <name>public</name> <url>http://xxx.xxx.xxx/nexus/content/groups/public/</url> </repository> <!-- 新加 --> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- 取决于你使用的编程语言,选择Java或者Scala API来构建你的Table API和SQL程序 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <!-- 如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner --> <!-- Either... (for the old planner that was available before Flink 1.9) --> <!-- 如果遇到:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl问题,解决办法是去掉: <scope>provided</scope> --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <!-- or.. (for the new Blink planner) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <!-- 内部实现上,部分 table 相关的代码是用 Scala 实现的。所以,下面的依赖也需要添加到你的程序里,不管是批式还是流式的程序: --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <!-- 如果你想实现自定义格式来解析Kafka数据,或者自定义函数,下面的依赖就足够了,编译出来的jar文件可以直接给SQL Client使用 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <!--***************************** scala依赖 *************************************--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <!--***************************** 用jdbc connector 的时候使用*************************--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.12.0</version> </dependency> </dependencies> <build> <finalName>flink-sql-demo</finalName> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> <encoding>UTF-8</encoding> <compilerVersion>${maven.compiler.source}</compilerVersion> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.12.4</version> <configuration> <skipTests>${maven.test.skip}</skipTests> </configuration> </plugin> <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> <version>0.12</version> <configuration> <excludes> <exclude>README.md</exclude> </excludes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-javadoc-plugin</artifactId> <version>2.10.4</version> <configuration> <aggregate>true</aggregate> <reportOutputDirectory>javadocs</reportOutputDirectory> <locale>en</locale> </configuration> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <configuration> <scalaCompatVersion>2.11</scalaCompatVersion> <scalaVersion>2.11.12</scalaVersion> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 打jar包插件(会包含所有依赖) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 可以设置jar包的入口类(可选) --> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <!--<plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <args> <arg>-nobootcp</arg> </args> <!– 解决Error:(55, 38) Static methods in interface require -target:jvm-1.8问题 –> <addScalacArgs>-target:jvm-1.8</addScalacArgs> </configuration> </plugin>--> </plugins> </build> </project>
前置条件(创建MySQL的表):
CREATE TABLE `stu` ( `name` varchar(60) DEFAULT NULL, `speciality` varchar(60) DEFAULT NULL, `id` bigint(20) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8; INSERT INTO stu (name,speciality,id) VALUES ('张三','美术',1), ('张三','音乐',2), ('李四','篮球',3), ('小明','美术',4), ('李四','美术',5), ('小明','音乐',6), ('赵六','数学',7), ('张三','美术',8), ('张三','音乐',9), ('李四','篮球',10); INSERT INTO stu (name,speciality,id) VALUES ('小明','美术',11), ('李四','美术',12), ('小明','音乐',13), ('田七','语文',14); CREATE TABLE `stu_tmp` ( `name` varchar(60) DEFAULT NULL, `num` bigint(20) DEFAULT '0', `id` bigint(20) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
package flinksqldemo; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; /** * @author tuzuoquan * @version 1.0 * @ClassName flinksqldemo.BatchCreateSelectInsertDemoWithJdbcConnector * @description * 1、引包:pom.xml中使用Blink的 * 2、流式包 还是 Batch包 * 3、jdk为1.8 * 4、必须指定主键 * 5、使用ROW_NUMBER() OVER (PARTITION BY t.flag ORDER BY t.num DESC) as id方式获取行号 * 6、要引入mysql的connector * 7、要引入mysql driver * * @date 2021/3/11 17:47 **/ public class BatchCreateSelectInsertDemoWithJdbcConnector { public static void main(String[] args) { // //StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); // EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // //StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // TableEnvironment tableEnv = TableEnvironment.create(bsSettings); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(bbSettings); // 1、创建表 tableEnv.executeSql( "CREATE TABLE flink_stu (" + " id BIGINT, " + " name STRING, " + " speciality STRING, " + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + "'connector' = 'jdbc'," + "'url'='jdbc:mysql://xxx.xxx.xxx.xxx:3306/test'," + "'table-name' = 'stu'," + "'username' = 'root', " + "'password' = 'xxxxxx'" + ")"); // tableEnv.executeSql("select * from stu"); // 2、查询表 TableResult tableResult = tableEnv.sqlQuery( "SELECT id, name,speciality FROM flink_stu").execute(); tableResult.print(); CloseableIterator<Row> it = tableResult.collect(); while (it.hasNext()) { Row row = it.next(); System.out.println(row.getField(0)); } TableSchema tableSchema = tableResult.getTableSchema(); System.out.println(tableSchema.getFieldName(1).toString()); // 3、创建输出表 tableEnv.executeSql( "CREATE TABLE flink_stu_tmp (" + " id BIGINT," + " name STRING, " + " num BIGINT," + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + "'connector' = 'jdbc'," + "'url'='jdbc:mysql://xxx.xxx.xxx.xxx:3306/test'," + "'table-name' = 'stu_tmp'," + "'username' = 'root', " + "'password' = 'xxxxxx'" + ")"); tableEnv.executeSql( "INSERT INTO flink_stu_tmp(id,name, num) " + "SELECT " + "ROW_NUMBER() OVER (PARTITION BY t.flag ORDER BY t.num DESC) as id, " + "t.name as name, " + "t.num as num " + "from ( " + "select name, count(name) as num, '1' as flag from flink_stu group by name) t"); //tableResult1.print(); String[] tables = tableEnv.listTables(); for(String t : tables) { System.out.println(t); } try { env.execute("jobName"); } catch (Exception e) { e.printStackTrace(); } } }
package flinksqldemo; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import java.time.LocalDate; import java.util.HashSet; import java.util.Set; import static org.apache.flink.table.api.Expressions.*; /** * Example for getting started with the Table & SQL API. * * <p>The example shows how to create, transform, and query a table. It should give a first * impression about the look-and-feel of the API without going too much into details. See the other * examples for using connectors or more complex operations. * * <p>In particular, the example shows how to * * <ul> * <li>setup a {@link TableEnvironment}, * <li>use the environment for creating example tables, registering views, and executing SQL * queries, * <li>transform tables with filters and projections, * <li>declare user-defined functions, * <li>and print/collect results locally. * </ul> * * <p>The example executes two Flink jobs. The results are written to stdout. */ public class StreamDataTableAndRowTypeAndUDF { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // BLINK STREAMING QUERY StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // create a table with example data without a connector required final Table rawCustomers = tableEnv.fromValues( Row.of("Guillermo Smith",LocalDate.parse("1992-12-12"),"4081 Valley Road","08540","New Jersey","m",true,0,78,3), Row.of("Valeria Mendoza",LocalDate.parse("1970-03-28"),"1239 Rainbow Road","90017","Los Angeles","f",true,9,39,0), Row.of("Leann Holloway",LocalDate.parse("1989-05-21"),"2359 New Street","97401","Eugene",null,true,null,null,null), Row.of("Brandy Sanders",LocalDate.parse("1956-05-26"),"4891 Walkers-Ridge-Way","73119","Oklahoma City","m",false,9,39,0), Row.of("John Turner",LocalDate.parse("1982-10-02"),"2359 New Street","60605","Chicago","m",true,12,39,0), Row.of("Ellen Ortega",LocalDate.parse("1985-06-18"),"2448 Rodney STreet","85023","Phoenix", "f",true,0,78,3)); final Table truncatedCustomers = rawCustomers.select(withColumns(range(1, 7))); // name columns final Table namedCustomers = truncatedCustomers.as( "name", "date_of_birth", "street", "zip_code", "city", "gender", "has_newsletter"); tableEnv.createTemporaryView("customers", namedCustomers); // use SQL whenever you like // call execute() and print() to get insights tableEnv.sqlQuery("SELECT " + " COUNT(*) AS `number of customers`, " + " AVG(YEAR(date_of_birth)) AS `average birth year` " + "FROM `customers`") .execute() .print(); // or further transform the data using the fluent Table API // e.g. filter, project fields, or call a user-defined function final Table youngCustomers = tableEnv.from("customers") .filter($("gender").isNotNull()) .filter($("has_newsletter").isEqual(true)) .filter($("date_of_birth").isGreaterOrEqual(LocalDate.parse("1980-01-01"))) .select($("name").upperCase(), $("date_of_birth"), call(AddressNormalizer.class, $("street"), $("zip_code"), $("city")).as("address")); rawCustomers.execute().print(); System.out.println("============================================================"); youngCustomers.execute().print(); System.out.println("==========================================================="); // use execute() and collect() to retrieve your results from the cluster // this can be useful for testing before storing it in an external system try (CloseableIterator<Row> iterator = youngCustomers.execute().collect()) { final Set<Row> expectedOutput = new HashSet<>(); expectedOutput.add( Row.of( "GUILLERMO SMITH", LocalDate.parse("1992-12-12"), "4081 VALLEY ROAD, 08540, NEW JERSEY")); expectedOutput.add( Row.of( "JOHN TURNER", LocalDate.parse("1982-10-02"), "2359 NEW STREET, 60605, CHICAGO")); expectedOutput.add( Row.of( "ELLEN ORTEGA", LocalDate.parse("1985-06-18"), "2448 RODNEY STREET, 85023, PHOENIX")); final Set<Row> actualOutput = new HashSet<>(); iterator.forEachRemaining(actualOutput::add); if (actualOutput.equals(expectedOutput)) { System.out.println("SUCCESS!"); } else { System.out.println("FAILURE!"); } } } public static class AddressNormalizer extends ScalarFunction { // the 'eval()' method defines input and output types (reflectively extracted) // and contains the runtime logic public String eval(String street, String zipCode, String city) { return normalize(street) + ", " + normalize(zipCode) + ", " + normalize(city); } private String normalize(String s) { return s.toUpperCase().replaceAll("\\W", " ").replaceAll("\\s+", " ").trim(); } } }
package flinksqldemo; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.Arrays; import java.util.Objects; import static org.apache.flink.table.api.Expressions.$; /** * Simple example for demonstrating the use of SQL on a Stream Table in Java. * <p>Usage: <code>flinksqldemo.StreamSQLExample --planner <blink|flink></code><br> * * <p>This example shows how to: - Convert DataStreams to Tables - Register a Table under a name - * Run a StreamSQL query on the registered Table **/ public class StreamSQLExample { public static void main(String[] args) throws Exception { // 在idea中的 Program arguments上填写:--planner blink final ParameterTool params = ParameterTool.fromArgs(args); String planner = params.has("planner") ? params.get("planner") : "blink"; //System.out.println(planner); // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv; if (Objects.equals(planner, "blink")) { EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); tEnv = StreamTableEnvironment.create(env, settings); } // use flink planner in streaming mode else if (Objects.equals(planner, "flink")) { EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build(); tEnv = StreamTableEnvironment.create(env, settings); } else { System.err.println( "The planner is incorrect. Please run 'flinksqldemo.StreamSQLExample --planner <planner>', " + "where planner (it is either flink or blink, and the default is blink) indicates whether the " + "example uses flink planner or blink planner."); return; } DataStream<Order> orderA = env.fromCollection( Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); DataStream<Order> orderB = env.fromCollection( Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); // convert DataStream to Table Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount")); // register DataStream as Table tEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount")); // union the two tables Table result = tEnv.sqlQuery( "SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2"); // System.out.println(result.execute()); tEnv.toAppendStream(result, Order.class).print(); // after the table program is converted to DataStream program, // we must use `env.execute()` to submit the job. env.execute("jobNameTest"); } // ************************************************************************* // USER DATA TYPES // ************************************************************************* /** Simple POJO. */ public static class Order { public Long user; public String product; public int amount; public Order() {} public Order(Long user, String product, int amount) { this.user = user; this.product = product; this.amount = amount; } @Override public String toString() { return "Order{" + "user=" + user + ", product='" + product + '\'' + ", amount=" + amount + '}'; } } }
package flinksqldemo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.FileUtils; import java.io.File; import java.io.IOException; /** * Simple example for demonstrating the use of SQL in Java. * * <p>Usage: {@code ./bin/flink run ./examples/table/flinksqldemo.StreamWindowSQLExample.jar} * * <p>This example shows how to: - Register a table via DDL - Declare an event time attribute in the * DDL - Run a streaming window aggregate on the registered table **/ public class StreamWindowSQLExample { public static void main(String[] args) throws Exception { // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // write source data into temporary file and get the absolute path String contents = "1,beer,3,2019-12-12 00:00:01\n" + "1,diaper,4,2019-12-12 00:00:02\n" + "2,pen,3,2019-12-12 00:00:04\n" + "2,rubber,3,2019-12-12 00:00:06\n" + "3,rubber,2,2019-12-12 00:00:05\n" + "4,beer,1,2019-12-12 00:00:08"; String path = createTempFile(contents); // register table via DDL with watermark, // the events are out of order, hence, we use 3 seconds to wait the late events String ddl = "CREATE TABLE orders (" + " user_id INT, " + " product STRING, " + " amount INT, " + " ts TIMESTAMP(3), " + " WATERMARK FOR ts AS ts - INTERVAL '3' SECOND " + ") WITH (" + " 'connector.type' = 'filesystem', " + " 'connector.path' = '" + path + "'," + " 'format.type' = 'csv' " + ")"; tEnv.executeSql(ddl); // run a SQL query on the table and retrieve the result as a new Table String query = "SELECT" + " CAST(TUMBLE_START(ts, INTERVAL '5' SECOND) AS STRING) window_start, " + " COUNT(*) order_num, " + " SUM(amount) total_amount, " + " COUNT(DISTINCT product) unique_products " + "FROM orders " + "GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)"; Table result = tEnv.sqlQuery(query); tEnv.toAppendStream(result, Row.class).print(); // after the table program is converted to DataStream program, // we must use `env.execute()` to submit the job env.execute("Streaming Window SQL Job"); // should output: // 2019-12-12 00:00:05.000,3,6,2 // 2019-12-12 00:00:00.000,3,10,3 } /** Creates a temporary file with the contents and returns the absolute path. */ private static String createTempFile(String contents) throws IOException { File tempFile = File.createTempFile("orders", ".csv"); tempFile.deleteOnExit(); FileUtils.writeFileUtf8(tempFile, contents); return tempFile.toURI().toString(); } }
package flinksqldemo; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import static org.apache.flink.table.api.Expressions.$; /** * @author tuzuoquan * @version 1.0 * @ClassName flinksqldemo.WordCountSQL * @description TODO * @date 2021/3/15 16:12 **/ public class WordCountSQL { public static void main(String[] args) throws Exception { // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); DataSet<WC> input = env.fromElements(new WC("Hello", 1), new WC("Ciao", 1), new WC("Hello", 1)); // register the DataSet as a view "WordCount" tEnv.createTemporaryView("WordCount", input, $("word"), $("frequency")); // run a SQL query on the Table and retrieve the result as a new Table Table table = tEnv.sqlQuery( "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); DataSet<WC> result = tEnv.toDataSet(table, WC.class); result.print(); } // ************************************************************************* // USER DATA TYPES // ************************************************************************* public static class WC { public String word; public long frequency; // public constructor to make it a Flink POJO public WC() {} public WC(String word, long frequency) { this.word = word; this.frequency = frequency; } @Override public String toString() { return "WC " + word + " " + frequency; } } }
package flinksqldemo;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import static org.apache.flink.table.api.Expressions.$; /** * Simple example for demonstrating the use of the Table API for a Word Count in Java. * * <p>This example shows how to: - Convert DataSets to Tables - Apply group, aggregate, select, and * filter operations */ public class WordCountTable { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); DataSet<WC> input = env.fromElements(new WC("Hello", 1), new WC("Ciao", 1), new WC("Hello", 1)); Table table = tEnv.fromDataSet(input); Table filtered = table.groupBy($("word")) .select($("word"), $("frequency").sum().as("frequency")) .filter($("frequency").isEqual(2)); DataSet<WC> result = tEnv.toDataSet(filtered, WC.class); result.print(); } // ************************************************************************* // USER DATA TYPES // ************************************************************************* /** Simple POJO containing a word and its respective count. */ public static class WC { public String word; public long frequency; // public constructor to make it a Flink POJO public WC() {} public WC(String word, long frequency) { this.word = word; this.frequency = frequency; } @Override public String toString() { return "WC " + word + " " + frequency; } } }
## Flink程序报:
`Error:(55, 38) Static methods in interface require -target:jvm-1.8
val env = TableEnvironment.create(settings)`
解决办法:
https://blog.csdn.net/tototuzuoquan/article/details/114841671
package org.apache.flink.table.examples.scala.basics import java.time.LocalDate import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, _} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row import org.apache.flink.util.CloseableIterator import scala.collection.JavaConverters._ /** * Example for getting started with the Table & SQL API in Scala. * * The example shows how to create, transform, and query a table. It should give a first impression * about the look-and-feel of the API without going too much into details. See the other examples for * using connectors or more complex operations. * * In particular, the example shows how to * - setup a [[TableEnvironment]], * - use the environment for creating example tables, registering views, and executing SQL queries, * - transform tables with filters and projections, * - declare user-defined functions, * - and print/collect results locally. * * The example executes two Flink jobs. The results are written to stdout. */ object GettingStartedExample { def main(args: Array[String]): Unit = { // setup the unified API // in this case: declare that the table programs should be executed in batch mode val settings = EnvironmentSettings.newInstance() .inBatchMode() .build() val env = TableEnvironment.create(settings) // create a table with example data without a connector required val rawCustomers = env.fromValues( row("Guillermo Smith", LocalDate.parse("1992-12-12"), "4081 Valley Road", "08540", "New Jersey", "m", true, 0, 78, 3), row("Valeria Mendoza", LocalDate.parse("1970-03-28"), "1239 Rainbow Road", "90017", "Los Angeles", "f", true, 9, 39, 0), row("Leann Holloway", LocalDate.parse("1989-05-21"), "2359 New Street", "97401", "Eugene", null, true, null, null, null), row("Brandy Sanders", LocalDate.parse("1956-05-26"), "4891 Walkers-Ridge-Way", "73119", "Oklahoma City", "m", false, 9, 39, 0), row("John Turner", LocalDate.parse("1982-10-02"), "2359 New Street", "60605", "Chicago", "m", true, 12, 39, 0), row("Ellen Ortega", LocalDate.parse("1985-06-18"), "2448 Rodney STreet", "85023", "Phoenix", "f", true, 0, 78, 3) ) // handle ranges of columns easily val truncatedCustomers = rawCustomers.select(withColumns(1 to 7)) // name columns val namedCustomers = truncatedCustomers .as("name", "date_of_birth", "street", "zip_code", "city", "gender", "has_newsletter") // register a view temporarily env.createTemporaryView("customers", namedCustomers) // use SQL whenever you like // call execute() and print() to get insights env .sqlQuery(""" |SELECT | COUNT(*) AS `number of customers`, | AVG(YEAR(date_of_birth)) AS `average birth year` |FROM `customers` |""".stripMargin ) .execute() .print() // or further transform the data using the fluent Table API // e.g. filter, project fields, or call a user-defined function val youngCustomers = env .from("customers") .filter($"gender".isNotNull) .filter($"has_newsletter" === true) .filter($"date_of_birth" >= LocalDate.parse("1980-01-01")) .select( $"name".upperCase(), $"date_of_birth", call(classOf[AddressNormalizer], $"street", $"zip_code", $"city").as("address") ) // use execute() and collect() to retrieve your results from the cluster // this can be useful for testing before storing it in an external system var iterator: CloseableIterator[Row] = null try { iterator = youngCustomers.execute().collect() val actualOutput = iterator.asScala.toSet val expectedOutput = Set( Row.of("GUILLERMO SMITH", LocalDate.parse("1992-12-12"), "4081 VALLEY ROAD, 08540, NEW JERSEY"), Row.of("JOHN TURNER", LocalDate.parse("1982-10-02"), "2359 NEW STREET, 60605, CHICAGO"), Row.of("ELLEN ORTEGA", LocalDate.parse("1985-06-18"), "2448 RODNEY STREET, 85023, PHOENIX") ) if (actualOutput == expectedOutput) { println("SUCCESS!") } else { println("FAILURE!") } } finally { if (iterator != null) { iterator.close() } } } /** * We can put frequently used procedures in user-defined functions. * * It is possible to call third-party libraries here as well. */ class AddressNormalizer extends ScalarFunction { // the 'eval()' method defines input and output types (reflectively extracted) // and contains the runtime logic def eval(street: String, zipCode: String, city: String): String = { normalize(street) + ", " + normalize(zipCode) + ", " + normalize(city) } private def normalize(s: String) = { s.toUpperCase.replaceAll("\\W", " ").replaceAll("\\s+", " ").trim } } }
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.table.examples.scala.basics import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ /** * Simple example for demonstrating the use of SQL on a Stream Table in Scala. * * <p>Usage: <code>flinksqldemo.StreamSQLExample --planner <blink|flink></code><br> * * <p>This example shows how to: * - Convert DataStreams to Tables * - Register a Table under a name * - Run a StreamSQL query on the registered Table */ object StreamSQLExample { // ************************************************************************* // PROGRAM // ************************************************************************* def main(args: Array[String]): Unit = { val params = ParameterTool.fromArgs(args) val planner = if (params.has("planner")) params.get("planner") else "blink" // set up execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = if (planner == "blink") { // use blink planner in streaming mode val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() StreamTableEnvironment.create(env, settings) } else if (planner == "flink") { // use flink planner in streaming mode val settings = EnvironmentSettings.newInstance() .useOldPlanner() .inStreamingMode() .build() StreamTableEnvironment.create(env, settings) } else { System.err.println("The planner is incorrect. Please run 'flinksqldemo.StreamSQLExample --planner <planner>', " + "where planner (it is either flink or blink, and the default is blink) indicates whether the " + "example uses flink planner or blink planner.") return } val orderA: DataStream[Order] = env.fromCollection(Seq( Order(1L, "beer", 3), Order(1L, "diaper", 4), Order(3L, "rubber", 2))) val orderB: DataStream[Order] = env.fromCollection(Seq( Order(2L, "pen", 3), Order(2L, "rubber", 3), Order(4L, "beer", 1))) // convert DataStream to Table val tableA = tEnv.fromDataStream(orderA, $"user", $"product", $"amount") // register DataStream as Table tEnv.createTemporaryView("OrderB", orderB, $"user", $"product", $"amount") // union the two tables val result = tEnv.sqlQuery( s""" |SELECT * FROM $tableA WHERE amount > 2 |UNION ALL |SELECT * FROM OrderB WHERE amount < 2 """.stripMargin) result.toAppendStream[Order].print() env.execute() } // ************************************************************************* // USER DATA TYPES // ************************************************************************* case class Order(user: Long, product: String, amount: Int) }
package org.apache.flink.table.examples.scala.basics import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ /** * Simple example for demonstrating the use of Table API on a Stream Table. * * This example shows how to: * - Convert DataStreams to Tables * - Apply union, select, and filter operations */ object StreamTableExample { // ************************************************************************* // PROGRAM // ************************************************************************* def main(args: Array[String]): Unit = { // set up execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val orderA = env.fromCollection(Seq( Order(1L, "beer", 3), Order(1L, "diaper", 4), Order(3L, "rubber", 2))).toTable(tEnv) val orderB = env.fromCollection(Seq( Order(2L, "pen", 3), Order(2L, "rubber", 3), Order(4L, "beer", 1))).toTable(tEnv) // union the two tables val result: DataStream[Order] = orderA.unionAll(orderB) .select('user, 'product, 'amount) .where('amount > 2) .toAppendStream[Order] result.print() env.execute() } // ************************************************************************* // USER DATA TYPES // ************************************************************************* case class Order(user: Long, product: String, amount: Int) }
package org.apache.flink.table.examples.scala.basics import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ /** * This program implements a modified version of the TPC-H query 3. The * example demonstrates how to assign names to fields by extending the Tuple class. * The original query can be found at * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) * (page 29). * * This program implements the following SQL equivalent: * * {{{ * SELECT * l_orderkey, * SUM(l_extendedprice*(1-l_discount)) AS revenue, * o_orderdate, * o_shippriority * FROM customer, * orders, * lineitem * WHERE * c_mktsegment = '[SEGMENT]' * AND c_custkey = o_custkey * AND l_orderkey = o_orderkey * AND o_orderdate < date '[DATE]' * AND l_shipdate > date '[DATE]' * GROUP BY * l_orderkey, * o_orderdate, * o_shippriority * ORDER BY * revenue desc, * o_orderdate; * }}} * * Input files are plain text CSV files using the pipe character ('|') as field separator * as generated by the TPC-H data generator which is available at * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). * * Usage: * {{{ * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path> * }}} * * This example shows how to: * - Convert DataSets to Tables * - Use Table API expressions * */ object TPCHQuery3Table { // ************************************************************************* // PROGRAM // ************************************************************************* def main(args: Array[String]) { if (!parseParameters(args)) { return } // set filter date val date = "1995-03-12".toDate // get execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = BatchTableEnvironment.create(env) val lineitems = getLineitemDataSet(env) .toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate) .filter('shipDate.toDate > date) val customers = getCustomerDataSet(env) .toTable(tEnv, 'id, 'mktSegment) .filter('mktSegment === "AUTOMOBILE") val orders = getOrdersDataSet(env) .toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio) .filter('orderDate.toDate < date) val items = orders.join(customers) .where('custId === 'id) .select('orderId, 'orderDate, 'shipPrio) .join(lineitems) .where('orderId === 'id) .select( 'orderId, 'extdPrice * (1.0f.toExpr - 'discount) as 'revenue, 'orderDate, 'shipPrio) val result = items .groupBy('orderId, 'orderDate, 'shipPrio) .select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio) .orderBy('revenue.desc, 'orderDate.asc) // emit result result.writeAsCsv(outputPath, "\n", "|") // execute program env.execute("Scala TPCH Query 3 (Table API Expression) Example") } // ************************************************************************* // USER DATA TYPES // ************************************************************************* case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String) case class Customer(id: Long, mktSegment: String) case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long) // ************************************************************************* // UTIL METHODS // ************************************************************************* private var lineitemPath: String = _ private var customerPath: String = _ private var ordersPath: String = _ private var outputPath: String = _ private def parseParameters(args: Array[String]): Boolean = { if (args.length == 4) { lineitemPath = args(0) customerPath = args(1) ordersPath = args(2) outputPath = args(3) true } else { System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + " Due to legal restrictions, we can not ship generated data.\n" + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " + "<orders-csv path> <result path>") false } } private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { env.readCsvFile[Lineitem]( lineitemPath, fieldDelimiter = "|", includedFields = Array(0, 5, 6, 10) ) } private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { env.readCsvFile[Customer]( customerPath, fieldDelimiter = "|", includedFields = Array(0, 6) ) } private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { env.readCsvFile[Order]( ordersPath, fieldDelimiter = "|", includedFields = Array(0, 1, 4, 7) ) } }
package org.apache.flink.table.examples.scala.basics import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ /** * Simple example that shows how the Batch SQL API is used in Scala. * * This example shows how to: * - Convert DataSets to Tables * - Register a Table under a name * - Run a SQL query on the registered Table * */ object WordCountSQL { // ************************************************************************* // PROGRAM // ************************************************************************* def main(args: Array[String]): Unit = { // set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = BatchTableEnvironment.create(env) val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) // register the DataSet as a view "WordCount" tEnv.createTemporaryView("WordCount", input, $"word", $"frequency") // run a SQL query on the Table and retrieve the result as a new Table val table = tEnv.sqlQuery("SELECT word, SUM(frequency) FROM WordCount GROUP BY word") table.toDataSet[WC].print() } // ************************************************************************* // USER DATA TYPES // ************************************************************************* case class WC(word: String, frequency: Long) }
package org.apache.flink.table.examples.scala.basics import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ /** * Simple example for demonstrating the use of the Table API for a Word Count in Scala. * * This example shows how to: * - Convert DataSets to Tables * - Apply group, aggregate, select, and filter operations * */ object WordCountTable { // ************************************************************************* // PROGRAM // ************************************************************************* def main(args: Array[String]): Unit = { // set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = BatchTableEnvironment.create(env) val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) val expr = input.toTable(tEnv) val result = expr .groupBy($"word") .select($"word", $"frequency".sum as "frequency") .filter($"frequency" === 2) .toDataSet[WC] result.print() } // ************************************************************************* // USER DATA TYPES // ************************************************************************* case class WC(word: String, frequency: Long) }
配置参考: [jdbc](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html) [kafka](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html) 触发方式: 针对每条触发一次 source kafka json 数据格式 topic: flink_test msg: {"day_time": "20201009","id": 7,"amnount":20} 创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink_test 查看topic是否存在了 bin/kafka-topics.sh --list --zookeeper localhost:2181 [root@flink01 flink-1.12.1]# cd $KAFKA_HOME [root@middleware kafka_2.12-2.6.0]# bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flink_test >{"day_time": "20201009","id": 7,"amnount":20} >
sink mysql 创建语句
CREATE TABLE sync_test_1 (
`day_time` varchar(64) NOT NULL,
`total_gmv` bigint(11) DEFAULT NULL,
PRIMARY KEY (`day_time`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
[root@flink01 ~]# cd $FLINK_HOME
[root@flink01 flink-1.12.1]# bin/sql-client.sh embedded
配置语句
create table flink_test_1 ( id BIGINT, day_time VARCHAR, amnount BIGINT, proctime AS PROCTIME () ) with ( 'connector' = 'kafka', 'topic' = 'flink_test', 'properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092', 'properties.group.id' = 'flink_gp_test1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.zookeeper.connect' = 'xxx.xxx.xxx.xxx:2181/kafka' ); CREATE TABLE sync_test_1 ( day_time string, total_gmv bigint, PRIMARY KEY (day_time) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/test?characterEncoding=UTF-8', 'table-name' = 'sync_test_1', 'username' = 'root', 'password' = '123456' ); INSERT INTO sync_test_1 SELECT day_time,SUM(amnount) AS total_gmv FROM flink_test_1 GROUP BY day_time;
cd $FLINK_HOME
./bin/sql-client.sh embedded
source kafka json 数据格式
创建topic
cd $KAFKA_HOME
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink_test_1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink_test_2
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flink_test_1
topic flink_test_1 {"day_time": "20201011","id": 8,"amnount":211}
{"day_time": "20211011","id": 1,"amnount":211}
{"day_time": "20211012","id": 2,"amnount":211}
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flink_test_2
topic flink_test_2 {"id": 8,"coupon_amnount":100}
{"id": 1,"coupon_amnount":100}
{"id": 2,"coupon_amnount":100}
注意:针对双流中的每条记录都发触发
sink mysql 创建语句
CREATE TABLE `sync_test_2` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`day_time` varchar(64) DEFAULT NULL,
`total_gmv` bigint(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uidx` (`day_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
配置语句
create table flink_test_2_1 ( id BIGINT, day_time VARCHAR, amnount BIGINT, proctime AS PROCTIME () ) with ( 'connector' = 'kafka', 'topic' = 'flink_test_1', 'properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092', 'properties.group.id' = 'flink_gp_test2-1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.zookeeper.connect' = 'xxx.xxx.xxx.xxx:2181/kafka' ); create table flink_test_2_2 ( id BIGINT, coupon_amnount BIGINT, proctime AS PROCTIME () ) with ( 'connector' = 'kafka', 'topic' = 'flink_test_2', 'properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092', 'properties.group.id' = 'flink_gp_test2-2', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.zookeeper.connect' = 'xxx.xxx.xxx.xxx:2181/kafka' ); CREATE TABLE sync_test_2 ( day_time string, total_gmv bigint, PRIMARY KEY (day_time) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/test?characterEncoding=UTF-8', 'table-name' = 'sync_test_2', 'username' = 'root', 'password' = '123456' ); INSERT INTO sync_test_2 SELECT day_time, SUM(amnount - coupon_amnount) AS total_gmv FROM ( SELECT a.day_time as day_time, a.amnount as amnount, b.coupon_amnount as coupon_amnount FROM flink_test_2_1 as a LEFT JOIN flink_test_2_2 b on b.id = a.id ) GROUP BY day_time;
source kafka json 数据格式
topic flink_test_1 {"day_time": "20201011","id": 8,"amnount":211}
dim test_dim
CREATE TABLE `test_dim` (
`id` bigint(11) NOT NULL,
`coupon_amnount` bigint(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Records of test_dim
-- ----------------------------
BEGIN;
INSERT INTO `test_dim` VALUES (1, 1);
INSERT INTO `test_dim` VALUES (3, 1);
INSERT INTO `test_dim` VALUES (8, 1);
COMMIT;
sink mysql 创建语句
CREATE TABLE `sync_test_3` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`day_time` varchar(64) DEFAULT NULL,
`total_gmv` bigint(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uidx` (`day_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
配置语句
create table flink_test_3 ( id BIGINT, day_time VARCHAR, amnount BIGINT, proctime AS PROCTIME () ) with ( 'connector' = 'kafka', 'topic' = 'flink_test_1', 'properties.bootstrap.servers' = '172.25.20.76:9092', 'properties.group.id' = 'flink_gp_test3', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.zookeeper.connect' = '172.25.20.76:2181/kafka' ); create table flink_test_3_dim ( id BIGINT, coupon_amnount BIGINT ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8', 'table-name' = 'test_dim', 'username' = 'videoweb', 'password' = 'suntek', 'lookup.max-retries' = '3', 'lookup.cache.max-rows' = 1000 ); CREATE TABLE sync_test_3 ( day_time string, total_gmv bigint, PRIMARY KEY (day_time) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8', 'table-name' = 'sync_test_3', 'username' = 'videoweb', 'password' = 'suntek' ); INSERT INTO sync_test_3 SELECT day_time, SUM(amnount - coupon_amnount) AS total_gmv FROM ( SELECT a.day_time as day_time, a.amnount as amnount, b.coupon_amnount as coupon_amnount FROM flink_test_3 as a LEFT JOIN flink_test_3_dim FOR SYSTEM_TIME AS OF a.proctime as b ON b.id = a.id ) GROUP BY day_time;
source kafka json 数据格式
topic flink_test_4
{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2021-01-05 11:12:12"}
{"username":"zhp","click_url":"https://www.infoq.cn/video/BYSSg4hGR5oZmUFsL8Kb","ts":"2020-01-05 11:12:15"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2020-01-05 11:12:18"}
{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2021-01-05 11:12:55"}
{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2021-01-05 11:13:25"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2021-01-05 11:13:25"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2021-01-05 11:13:26"}
sink mysql 创建语句
CREATE TABLE `sync_test_tumble_output` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`window_start` datetime DEFAULT NULL,
`window_end` datetime DEFAULT NULL,
`username` varchar(255) DEFAULT NULL,
`clicks` bigint(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
配置语句
-- -- 开启 mini-batch 指定是否启用小批量优化 (相关配置说明 https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/config.html) SET table.exec.mini-batch.enabled=true; -- -- mini-batch的时间间隔,即作业需要额外忍受的延迟 SET table.exec.mini-batch.allow-latency=60s; -- -- 一个 mini-batch 中允许最多缓存的数据 SET table.exec.mini-batch.size=5; create table user_clicks ( username varchar, click_url varchar, ts timestamp, -- ts BIGINT, -- ts2 AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '20' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'flink_test_4', 'properties.bootstrap.servers' = '172.25.20.76:9092', 'properties.group.id' = 'flink_gp_test4', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.zookeeper.connect' = '172.25.20.76:2181/kafka' ); CREATE TABLE sync_test_tumble_output ( window_start TIMESTAMP(3), window_end TIMESTAMP(3), username VARCHAR, clicks BIGINT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8', 'table-name' = 'sync_test_tumble_output', 'username' = 'videoweb', 'password' = 'suntek' ); INSERT INTO sync_test_tumble_output SELECT TUMBLE_START(ts, INTERVAL '60' SECOND) as window_start, TUMBLE_END(ts, INTERVAL '60' SECOND) as window_end, username, COUNT(click_url) FROM user_clicks GROUP BY TUMBLE(ts, INTERVAL '60' SECOND), username;
source kafka json 数据格式
topic flink_test_5
{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2020-01-05 11:12:12"}
{"username":"zhp","click_url":"https://www.infoq.cn/video/BYSSg4hGR5oZmUFsL8Kb","ts":"2020-01-05 11:12:15"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2020-01-05 11:12:18"}
{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2020-01-05 11:12:55"}
{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2020-01-05 11:13:25"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2020-01-05 11:13:25"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2020-01-05 11:13:26"}
sink mysql 创建语句
CREATE TABLE `sync_test_hop_output` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`window_start` datetime DEFAULT NULL,
`window_end` datetime DEFAULT NULL,
`username` varchar(255) DEFAULT NULL,
`clicks` bigint(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
配置语句
-- -- 开启 mini-batch (相关配置说明 https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/config.html) SET table.exec.mini-batch.enabled=true; -- -- mini-batch的时间间隔,即作业需要额外忍受的延迟 SET table.exec.mini-batch.allow-latency=60s; -- -- 一个 mini-batch 中允许最多缓存的数据 SET table.exec.mini-batch.size=5; create table user_clicks ( username varchar, click_url varchar, ts timestamp, -- ts BIGINT, -- ts2 AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'flink_test_5', 'properties.bootstrap.servers' = '172.25.20.76:9092', 'properties.group.id' = 'flink_gp_test5', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.zookeeper.connect' = '172.25.20.76:2181/kafka' ); CREATE TABLE sync_test_hop_output ( window_start TIMESTAMP(3), window_end TIMESTAMP(3), username VARCHAR, clicks BIGINT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8', 'table-name' = 'sync_test_hop_output', 'username' = 'videoweb', 'password' = 'suntek' ); --统计每个用户过去1分钟的单击次数,每30秒更新1次,即1分钟的窗口,30秒滑动1次 INSERT INTO sync_test_hop_output SELECT HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_start, HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_end, username, COUNT(click_url) FROM user_clicks GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username;
1. datagen简介
在flink 1.11中,内置提供了一个DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。
使用时注意如下:
bin/sql-client.sh embedded
或
bin/sql-client.sh embedded -l 依赖的jar包路径
flink SQL测试
CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'datagen', -- optional options -- 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ); select * from datagen;
2.2 参数解释
DDL的with属性中,除了connector是必填之外,其他都是可选的。
rows-per-second 每秒生成的数据条数
f_sequence字段的生成策略是按序列生成,并且指定了起始值,所以该程序将会在到达序列的结束值之后退出
f_random 字段是按照随机生成,并指定随机生成的范围
f_random_str是一个字符串类型,属性中指定了随机生成字符串的长度是10
ts列是一个计算列,返回当前的时间.
使用时注意如下:
bin/sql-client.sh embedded
或
bin/sql-client.sh embedded -l 依赖的jar包路径
flink SQL测试
CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'datagen', -- optional options -- 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ); select * from datagen;
2.2 参数解释
DDL的with属性中,除了connector是必填之外,其他都是可选的。
rows-per-second 每秒生成的数据条数
f_sequence字段的生成策略是按序列生成,并且指定了起始值,所以该程序将会在到达序列的结束值之后退出
f_random 字段是按照随机生成,并指定随机生成的范围
f_random_str是一个字符串类型,属性中指定了随机生成字符串的长度是10
ts列是一个计算列,返回当前的时间.
将flink的connector的jar包、flink-json、flink avro相关的包放到flink的lib目录下
创建两个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pageviews
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pageviews_per_region
查看topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
生产者生产数据
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic pageviews
>{"user_id":11,"page_id":1,"viewtime":"2007-12-03 10:15:30","user_region":"hangzhou"}
>{"user_id":12,"page_id":2,"viewtime":"2008-12-03 10:15:30","user_region":"hangzhou"}
>{"user_id":13,"page_id":3,"viewtime":"2009-12-03 10:15:30","user_region":"hangzhou"}
>{"user_id":14,"page_id":4,"viewtime":"2010-12-03 10:15:30","user_region":"henan"}
消费者消费数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageviews_per_region
CREATE TABLE pageviews_per_region ( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY (user_region) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'pageviews_per_region', 'properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092', 'key.format' = 'avro', 'value.format' = 'avro' ); CREATE TABLE pageviews ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'pageviews', 'properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092', 'format' = 'json' ); -- 计算 pv、uv 并插入到 upsert-kafka sink INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCT user_id) FROM pageviews GROUP BY user_region;
ES数据操作查询工具
http://xxx.xxx.xxx.xxx:5601/ ,进入Dev Tools
ES数据操作
创建记录index visit ,此记录id为testx1
POST /visit/_doc/testx1
{
"user_id":1,"user_name":"zhangsan","uv":10,"pv":20
}
查询记录
get /visit/_doc/testx1
删除记录
delete /visit/_doc/testx1
创建目标表,ES只能作为目标表
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING,
u_v BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://xxx.xxx.xxx.xxx:9200',
'index' = 'visit',
'document-type'='_doc'
);
模拟数据:
插入数据后,数据将在ES中出现
INSERT INTO default_database.myUserTable VALUES ('2','wangwu',11,21),('3','zhaoliu',21,31);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。