赞
踩
软件版本:Spring Boot 2.1.6+Flink1.6.1+JDK1.8
程序主体:
@SpringBootApplication
public class HadesTmsApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(HadesTmsApplication.class);
application.setBannerMode(Banner.Mode.OFF);
application.run(args);
}
@Override
public void run(String... args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer010 kafkaConsumer = new FlinkKafkaConsumer010<>("topic-name"), new SimpleStringSchema(), getProperties());
DataStream dataStream = env.addSource(kafkaConsumer);
// 此处省略处理逻辑
dataStream.addSink(new MySink());
}
private Properties getProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrap_servers);
properties.setProperty("zookeeper.connect", zookeeper_connect);
properties.setProperty("group.id", group_id);
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
}
说明一下:因为是非web项目,所以实现CommandLineRunner接口,重写run方法。在里面编写流处理逻辑。
如果在MySink中需要使用spring容器中的类&#
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。