赞
踩
public class FlinkEnvironment01 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> streamSource = environment.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> sourceContext) throws Exception { while (true) { sourceContext.collect(String.valueOf(System.currentTimeMillis())); TimeUnit.MILLISECONDS.sleep(500); } } @Override public void cancel() { } }); streamSource.print(); environment.execute(); } }
2> 1692668361022
3> 1692668361537
4> 1692668362039
5> 1692668362542
<!--idea中可以直接创建web界面并提交直接启动的依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.12.7</version>
</dependency>
Configuration configuration = new Configuration();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
http://localhost:8081
需要加插件,不然默认的没有依赖(外部库)
pom.xml(直接加在</dependencies>
后面)
<!--打包--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.itszt23.flink.FlinkEnvironment01</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
需要指明主类(全名称)
<mainClass>com.itszt23.flink.FlinkEnvironment01</mainClass>
java -jar
直接运行jar包
需要有start-cluster.bat
先启动.bat,会弹出两个黑窗口,一个Job,一个Task
然后访问页面http://localhost:8081/
上传jar包
运行后:job窗口不变化、task窗口显示运行内容
ctrl+c退出
wget https://repo.huaweicloud.com/apache/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz
tar -xf flink-1.12.7-bin-scala_2.12.tgz
cd flink-1.12.7/
cd bin
./start-cluster.sh
./start-cluster.sh
启动
访问http://服务器IP地址:8081 (记得开放防火墙端口8081)
上传jar包
日志可以在log文件夹里查看
比较小的文件可以直接用vi编辑器查看vi flink-ubuntu-taskexecutor-0-VM-8-4-ubuntu.out
tail -f
动态输出大文件查看日志
./stop-cluster.sh
关闭首先使用ps -ef | grep flink
查找进程ID为15256、15533
kill -9 15256
kill -9 15533
ps -ef |grep flink
nohup
scp命令将jar包复制到云服务器中
nohup java -jar demo-flink-1.0-SNAPSHOT.jar > myout.txt 2>&1 &
输出内容到myout.txt中
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 9000);
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
vi flink-conf.yaml
使用查找命令/rest.port
或者?rest.port
,同样修改即可
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。