当前位置:   article > 正文

kafka入门篇-使用_kafka.scala

kafka.scala

搭建kafka

1、上传压缩包到任意节点
2、解压,配置环境变量 所有节点都配置

3、修改config/server.properties
1、broker.id=0,每一个节点broker.id 要不一样
2、zookeeper.connect=master:2181,node1:2181,node2:2181
3、log.dirs=/usr/local/soft/kafka_2.11-1.0.0/data 消息存放的位置

4、复制到其它节点
scp -r kafka_2.11-1.0.0 node2:pwd
scp -r kafka_2.11-1.0.0 node1:pwd

5、修改每个节点的broker.id master=0 node1=1 node2=2

6、启动(kafka可以不依赖于Hadoop,但是要依赖于zookeeper)
1、启动zookeeper, 需要在所有节点启动
zkServer.sh start

查看状态
 zkServer.sh status
 
3,在每台节点启动broker,  kafka是去中心化的架构   -daemon 后台启动   在所有节点启动
kafka-server-start.sh -daemon  /usr/local/soft/kafka_2.11-1.0.0/config/server.properties
  • 1
  • 2
  • 3
  • 4
  • 5

1、创建topic

–replication-factor —每一个分区的副本数量
–partition --分区数, 根据数据量设置

伪分布式的时候,副本数设置一个就可
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 3 --topic test_topic1

2、查看topic描述信息
kafka-topics.sh --describe --zookeeper master:2181 --topic test_topic1

3、获取所有topic
kafka-topics.sh --list --zookeeper master:2181

4、创建控制台生产者
kafka-console-producer.sh --broker-list master:9092 --topic test_topic1

5、创建控制台消费者 --from-beginning 从头消费,, 如果不在执行消费的新的数据
kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test_topic1

重置kafka
1、关闭kafka
kill -9

2、删除元数据 zk
zkCli.sh
删除预kafka有关的所有信息
ls /
rmr /config
rmr /brokers

3、删除kafka的数据 所有节点都要删除
rm -rf /usr/local/soft/kafka_2.11-1.0.0/data

4 重启
kafka-server-start.sh -daemon /usr/local/soft/kafka_2.11-1.0.0/config/server.properties

idea中使用flink结合kafka

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>ShuJia01</artifactId>
        <groupId>ShuJia</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>kafka</artifactId>


    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.11.2</version>
        </dependency>


        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.12</version>
        </dependency>


    </dependencies>


    <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>





</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

idea中消费生产者生产的数据

package com.shujia.source

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object Demo3KafkaProducer {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.5.201:9092")
    properties.setProperty("group.id", "test")

    //创建flink kafka 消费者
    val flinkKafkaConsumer = new FlinkKafkaConsumer[String]("test_topic1", new SimpleStringSchema(), properties)


    //    flinkKafkaConsumer.setStartFromEarliest()      // 尽可能从最早的记录开始
    //    flinkKafkaConsumer.setStartFromLatest()        // 从最新的记录开始
    //flinkKafkaConsumer.setStartFromTimestamp(...)  // 从指定的时间开始(毫秒)

    /**
      * 如果消费者组之前不存在,读取最新的数据
      * 如果消费者组已存在,接着之前读取数据
      *
      */
    flinkKafkaConsumer.setStartFromEarliest()  // 默认的方法

    val kafkaDS: DataStream[String] = env.addSource(flinkKafkaConsumer)


    kafkaDS.print()


    env.execute()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

在这里插入图片描述
在这里插入图片描述

消费学生信息

package com.shujia
import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}

object Demo3Comsumer {
  def main(args: Array[String]): Unit = {
    //1、创建消费者

    val properties = new Properties()

    //指定kafka的broker的地址
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("group.id", "asdasdd")


    /**
      * earliest
      * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      * latest
      * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      * none
      * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      *
      */


    //从最早读取数据
    properties.put("auto.offset.reset", "earliest")


    val consumer = new KafkaConsumer[String, String](properties)
    println("链接创建成功")

    //订阅topic
    val topics = new util.ArrayList[String]()
    topics.add("student2")
    consumer.subscribe(topics)


    while (true) {
      //消费数据
      val records: ConsumerRecords[String, String] = consumer.poll(1000)
      println("正在消费数据")

      //获取读到的所有数据
      val iterator: util.Iterator[ConsumerRecord[String, String]] = records.iterator()

      while (iterator.hasNext) {
        //获取一行数据
        val record: ConsumerRecord[String, String] = iterator.next()

        val topic: String = record.topic()
        val patition: Int = record.partition()
        val offset: Long = record.offset()
        val key: String = record.key()
        val value: String = record.value()
        //默认是系统时间
        val ts: Long = record.timestamp()
        println(s"$topic\t$patition\t$offset\t$key\t$value\t$ts")

      }

    }


    consumer.close()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

idea中生产数据

package com.shujia
import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object Demo1kafkaproducer {
  def main(args: Array[String]): Unit = {
    /**
      * 1、创建kfaka链接
      * 创建生产者
      */

    val properties = new Properties()

    //指定kafka的broker的地址
    properties.setProperty("bootstrap.servers", "master:9092")
    //key和value序列化类
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


    //生产者
    val producer = new KafkaProducer[String, String](properties)


    //生产数据
    //topic 不存在会自动创建一个分区为1副本为1的topic
    val record = new ProducerRecord[String, String]("test1", "java")

    producer.send(record)

    //将数据刷到kafka中
    producer.flush()


    //关闭链接
    producer.close()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

在这里插入图片描述

生产学生信息

package com.shujia
import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.io.Source
object Demo2Studentkafka {
  def main(args: Array[String]): Unit = {
    /**
      * 1、创建kfaka链接
      * 创建生产者
      *
      */

    val properties = new Properties()

    //指定kafka的broker的地址
    properties.setProperty("bootstrap.servers", "master:9092")
    //key和value序列化类
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    //生产者
    val producer = new KafkaProducer[String, String](properties)

    //读取学生表
    Source
      .fromFile("data/students.txt")
      .getLines()
      .foreach(student => {

        //将用一个班级的学生打入同一个分区
        val clazz: String = student.split(",")(4)
        val partition: Int = math.abs(clazz.hashCode) % 2

        //将数据发送到kafka

        //kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 2 --topic student2
        val record = new ProducerRecord[String, String]("student2", partition, null, student)

        producer.send(record)
        producer.flush()

      })

    //关闭链接
    producer.close()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/377761
推荐阅读
相关标签
  

闽ICP备14008679号