赞
踩
测试flink1.12.7 连接kafka:
- package org.test.flink;
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-
- import java.util.Properties;
-
- public class FlinkKafka {
- public static void main(String[] args) throws Exception {
- // 1. 执行环境-env
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // 2. 数据源-source
- // 2-1. 创建消费Kafka数据时属性
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "11.0.24.216:9092");
- props.setProperty("group.id", "test1111");
- System.out.println("--2-2. 构建FlinkKafkaConsumer实例对象--");
- // 2-2. 构建FlinkKafkaConsumer实例对象
- FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
- "testtopic", //
- new SimpleStringSchema(), //
- props
- );
- System.out.println("--2-3. 添加Source--");
- // 2-3. 添加Source
- DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
-
- // 3. 数据转换-transformation
- // 4. 数据接收器-sink
- System.out.println("--kafkaStream.print();--");
- kafkaStream.print();
-
- // 5. 触发执行-execute
- env.execute("ConnectorFlinkKafkaConsumerDemo") ;
- }
- }

执行报错如下:
- Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
- at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
- at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
- at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
- at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
- at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
- at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
- at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
- at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
- at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
- at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
- at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
- at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1061)
- at akka.dispatch.OnComplete.internal(Future.scala:264)
- at akka.dispatch.OnComplete.internal(Future.scala:261)
- at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
- at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
- at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
- at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
- at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
- at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
- at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
- at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
- at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
- at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
- at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
- at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
- at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
- at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
- at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
- at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
- at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
- at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
- at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
- at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
- at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
- at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
- at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
- at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
- at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
- at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
- Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
- at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
- at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
- at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
- at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
- at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
- at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
- at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
- at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:498)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
- at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
- at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
- at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
- at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
- at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
- at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
- at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
- at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
- at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
- at akka.actor.Actor.aroundReceive(Actor.scala:517)
- at akka.actor.Actor.aroundReceive$(Actor.scala:515)
- at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
- at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
- at akka.actor.ActorCell.invoke(ActorCell.scala:561)
- at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
- at akka.dispatch.Mailbox.run(Mailbox.scala:225)
- at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
- ... 4 more
- Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition testtopic-0 could be determined
-
- Process finished with exit code 1

因为工程中log4j默认等级是error,所以,先配置resource/log4j.properties,日志等级改成info:
- log4j.rootLogger = info,console
-
- ### 配置输出到控制台 ###
- log4j.appender.console = org.apache.log4j.ConsoleAppender
- log4j.appender.console.Target = System.out
- log4j.appender.console.layout = org.apache.log4j.PatternLayout
- #log4j.appender.console.layout.ConversionPattern = %d{ABSOLUTE} %5p %c{ 1 }:%L - %m%n
再运行,发现如下错误:
- java.net.UnknownHostException: kafka
- at java.net.InetAddress.getAllByName0(InetAddress.java:1281)
- at java.net.InetAddress.getAllByName(InetAddress.java:1193)
- at java.net.InetAddress.getAllByName(InetAddress.java:1127)
- at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104)
- at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
- at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
- at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
- at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949)
- at org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
- at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
- at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
- at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
- at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
- at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
- at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
- at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
- at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240)
- at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:788)
- at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:750)
- at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2338)
- at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1725)
- at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
- at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:446)
- at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:243)

因为服务器的kafka我是装的docker版,用于测试:
.env配置如下:
- KAFKA_LOG_RETENTION_HOURS=16
- KAFKA_BROKER_ID=0
- KAFKA_ZOOKEEPER_CONNECT=zk-host:2181
- # 这个参数不能用0.0.0.0,不然会报错,需要设置成实际访问的ip
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
查了下KAFKA_ADVERTISED_LISTENERS这个参数的解释:
深入理解KAFKA_ADVERTISED_LISTENERS_kafka_cfg_listeners_jason.zeng@1502207的博客-CSDN博客
KAFKA_ADVERTISED_LISTENERS:负责发布外网地址,这个地址会发布到zookeeper中。
所以最终会去访问kafka。
目前有两种方案:
1、在本地的hosts中配置域名指向;
- C:\Windows\System32\drivers\etc\HOSTS
- 11.0.24.216 kafka
2、修改kafka安装的配置,把kafka改成具体的ip
我通过1成功解决。
具体代码参考(gitee上):flink-java-1.12.7: flink1.12.7的java demo,包括flink wordcount示例,如何连接kafka
docker安装kafka:Docker 安装Kafka_在奋斗的大道的博客-CSDN博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。