当前位置:   article > 正文

kafka安装及使用_kafka map

kafka map

kafka

是一个开源的消息系统 由scala语言完成的

是为处理 实时数据提供一个统一的,高吞吐量的,低延迟的平台

是一个分布式的消息队列。由生产者和消费者组成的

对消息的保存,根据Topic(主题)进行归类

无论是单机kafka还是集群kafka,都依赖于zookeeper集群

让zookeeper集群保存一些元数据

保证系统的可用性

安装前提:

有正常的三台虚拟机且都配置zookeeper

安装:

解压缩到对应的文件夹之后配置环境

需配置:

首先其运行的环境变量要配置好

 

 配置中箭头所指向的以及黑点标记的,ip用自己虚拟机的

 修改好了以上配置之后就差不多完善了正常的一台主虚拟机的配置了

后面就开始配置另外两台虚拟机

分发

scp /etc/profile root@cx02:/etc/profile

scp /etc/profile root@cx03:/etc/profile

 scp -r /usr/local/kafka root@cx02:/usr/local/
 scp -r /usr/local/kafka root@cx03:/usr/local/
分别将我的环境变量所在文件和kafka所在文件夹分发给另外两台虚拟机

然后还需要修改:

环境变量都要刷新

看kafka中的myid  都有他们对应的123

  以及zookeeper.properties的最底部所在虚拟机都调成0.0.0.0防止之后报3888端口号错误

server.properties        中除了修改broker.id外还有其listeners和advertised.listeners所对应的ip地址都需要改成虚拟机自己的。

其他的就差不多没有了

kafka运行

在三台虚拟机上启动zookeeper                //启动zookeeper

zkServer.sh start

在三台主机上分别输入               //启动kafka(可能需要在bin目录在才能找到开启命令)

 kafka-server-start.sh /usr/local/kafka/config/server.properties

一些常用命令

创建主题

kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --create --partitions 3 --replication-factor 3 --topic test

查看主题

kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --list

删除主题

kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --delete --topic test

查看主题

kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --desc --topic test

创建一个生产者,开启一个新的终端(终端会被占用)

flink的数据源是kafka的消费者,生产者是web项目

kafka-console-producer.sh --broker-list 192.168.60.131:9092 --topic test

创建一个消费者

kafka-console-consumer.sh --zookeeper 192.168.60.131:2181 --topic test

 释:cx0*为主机名,即hostname名

flink整合kafka

首先仅适应于flink的流式数据

即只能在StreamExecutionEnvironment环境下kafka才能引用

以下分享一个较简单的样例

总体步骤

启动zookeeper集群,启动kafka,

创建一个接受数据的主题:userS

创建一个接受结果消息的主题:userR

使用一个终端producer,来发送消息,主题为userS

使用一个终端consumer,来接收flink的消息,主题为userR

  1. package cx.kafka
  2. import cx.model.{Student, User}
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema
  4. import org.apache.flink.streaming.api.scala._
  5. import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
  6. import java.util.Properties
  7. object KafkaAsSourceDemo {
  8. def main(args: Array[String]): Unit = {
  9. //
  10. val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  11. //声明主题
  12. val topic = "userS"
  13. //声明序列化的类
  14. val valueDeserializer = new SimpleStringSchema()
  15. //声明配置文件
  16. val props = new Properties()
  17. //加载配置文件路径
  18. props.load(this.getClass.getClassLoader.getResourceAsStream("kafka/consumer.properties"))
  19. //声明数据源
  20. val ds = env.addSource[String](new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),props))
  21. .filter(_.trim.nonEmpty)
  22. .map(perMsg=>{
  23. val arr = perMsg.split(",") //通过 , 拆分数据结果为数组
  24. val name:String = arr(0).trim //将数组的第一个数据赋值给name,trim去除前后空格
  25. val age:Long = arr(1).toLong //将数组的第二个数据赋值给age,toLong转换数据类型
  26. val phone:Long = arr(2).toLong
  27. val isOnline:String = arr(3).trim
  28. val salary:Long = arr(4).toLong
  29. User(name,age,phone,isOnline,salary)
  30. }).filter(_.age>25)
  31. .map(_.toString)
  32. //添加
  33. val producerConfig = new Properties()
  34. producerConfig.load(this.getClass.getClassLoader.getResourceAsStream("kafka/producer.properties"))
  35. ds.addSink(new FlinkKafkaProducer[String]("userR",new SimpleStringSchema(),producerConfig))
  36. env.execute(this.getClass.getSimpleName)
  37. }
  38. }

 其中需要事先准备两个kafka主题,以及idea中准备一个样例类User

  1. package cx.model
  2. case class User(name: String,age: Long,phone:Long,isOnline:String,salary:Long)
kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --create --partitions 3 --replication-factor 3 --topic userR         

kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --create --partitions 3 --replication-factor 3 --topic userS

运行

 

kafka-console-producer.sh --broker-list 192.168.60.131:9092 --topic userS

kafka-console-consumer.sh --zookeeper 192.168.60.131:2181 --topic userR

启动idea类

运行结果

结合代码分析,成功过滤出年龄小于35的人User类

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/597138
推荐阅读
相关标签
  

闽ICP备14008679号