赞
踩
我这里使用的是 jdk17
,
flink版本 1.18.1
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.xyz2022</groupId> <artifactId>flink-demo1</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.18.1</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>2.0.12</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers combine.children="append"> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
/resources
目录下配置 log4j
log4j.rootLogger=debug, stdout, R log4j.logger.org=error log4j.logger.cn.xyz2022=debug log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d [%5p] [%20c] [%10t] %m%n log4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.File=./log/flink.log log4j.appender.R.MaxFileSize=10MB # Keep one backup file log4j.appender.R.MaxBackupIndex=5 log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%d [%5p] [%20c] [%10t] %m%n
package cn.xyz2022; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @Slf4j public class FlinkStreamJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); DataStreamSource<Long> streamSource = environment.addSource(new SourceFunction<Long>() { private boolean running = true; /** * 这里定义一个每秒输出当前毫秒数的source */ @Override public void run(SourceContext<Long> sourceContext) throws Exception { while (running) { long ts = System.currentTimeMillis(); ts = ts - ts % 1000; sourceContext.collect(ts); Thread.sleep(1000 - ts % 1000); } } @Override public void cancel() { running = false; } }); streamSource.print("out >>"); environment.execute(); } }
输出结果
out >>> 1709635317000 out >>> 1709635318000 out >>> 1709635319000 out >>> 1709635320000 out >>> 1709635321000 out >>> 1709635322000 out >>> 1709635323000 out >>> 1709635324000 out >>> 1709635325000 out >>> 1709635326000 out >>> 1709635327000 out >>> 1709635328000 out >>> 1709635329000 out >>> 1709635330000 out >>> 1709635331000 out >>> 1709635332000 out >>> 1709635333000 out >>> 1709635334000 out >>> 1709635335000 out >>> 1709635336000 out >>> 1709635337000 out >>> 1709635338000 out >>> 1709635339000
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。