赞
踩
生产者层面
- package Kafka
-
-
- import java.util.Properties
-
- import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-
-
- object KafkaProducerTest {
- def main(args: Array[String]): Unit = {
- //定义topic,把数据传到该topic
- val topic = "KafkaSimple3"
- //创建一个配置文件信息类
- val properties = new Properties()
- //数据在序列化的编码类型
- properties.put("serializer.class", "kafka.serializer.StringEncoder")
- //kafaka集群列表
- properties.put("metadata.broker.list", "192.168.42.132:9092,192.168.42.134:9092,192.168.42.135:9092")
- //设置发送数据后是否需要服务端的反馈:0,1,-1
- properties.put("request.required.acks", "1")
- //调用分区器
- properties.put("partitioner.class", "Kafka.CustomPartitioner")
-
- val config: ProducerConfig = new ProducerConfig(properties)
- //创建一个生产者实例vb
- val producer: Producer[String, String] = new Producer(config)
-
- //模拟生产一些数据
- for (i <- 1 to 10000) {
- val msg = s"$i: Producer send data"
- producer.send(new KeyedMessage[String, String](topic, msg))
- }
-
- }
-
- }

消费者层面
- package Kafka
-
- import java.util.Properties
-
- import kafka.consumer._
- import kafka.message.MessageAndMetadata
-
- import scala.actors.threadpool.{ExecutorService, Executors}
- import scala.collection.mutable
-
- class KafkaConsumer(val consumer: String, val stream: KafkaStream[Array[Byte], Array[Byte]]) extends Runnable {
- override def run(): Unit = {
- val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator()
- while (it.hasNext()) {
- val data: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
- val topic: String = data.topic
- val partition: Int = data.partition
- val offset: Long = data.offset
- val msg = new String(data.message())
- println(s"Consumer: $consumer, Topic: $topic, Partition: $partition, Offset: $offset, msg: $msg")
- }
- }
- }
-
- object KafkaConsumerTest {
- def main(args: Array[String]): Unit = {
- //定义用来读取数据的topic
- val topic = "KafkaSimple3"
-
- //用来存储多个topic
- val topics = new mutable.HashMap[String, Int]()
- topics.put(topic, 2)
-
- //配置文件信息
- val properties = new Properties()
- //ConsumerGroup id
- properties.put("group.id", "zwj-consumer-group")
- // 指定zookeeper的地址列表, 注意:value里不要有空格
- properties.put("zookeeper.connect", "192.168.42.132:2181,192.168.42.134:2181,192.168.42.135:2181")
- // 如果zookeeper没有offset值或者offset值超出范围,需要指定一个初始的offset
- properties.put("auto.offset.reset", "smallest")
- //把配置信息封装到ConsumerConfig对象里
- val config = new ConsumerConfig(properties)
-
- //创建Consumer实例,如果没有数据,会一直线程等待
- val consumer: ConsumerConnector = Consumer.create(config)
-
- //根据所传topics来获取数据,得到一个stream流
- val streams: collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = consumer.createMessageStreams(topics)
-
- //获取指定topic的数据
- val stream: Option[List[KafkaStream[Array[Byte], Array[Byte]]]] = streams.get(topic)
-
- //创建一个固定大小的线程池
- val pool: ExecutorService = Executors.newFixedThreadPool(3)
- for (i <- 0 until stream.size) {
- pool.execute(new KafkaConsumer(s"Consumer: $i", stream.get(i)))
- }
-
-
- }
- }

pom.xml(里面有比较多不必要的包是写之前的代码残留的)
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.zwj</groupId>
- <artifactId>ScalaTest</artifactId>
- <version>1.0-SNAPSHOT</version>
- <properties>
- <maven.compiler.source>1.7</maven.compiler.source>
- <maven.compiler.target>1.7</maven.compiler.target>
- <encoding>UTF-8</encoding>
- <scala.version>2.10.6</scala.version>
- <spark.version>1.6.3</spark.version>
- <hadoop.version>2.6.4</hadoop.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.9.0.1</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.9.0.1</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/log4j/log4j -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.10.0</version>
- </dependency>
-
-
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-streams</artifactId>
- <version>0.10.0.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>connect-api</artifactId>
- <version>0.10.0.1</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- <version>1.6.3</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.46</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <sourceDirectory>src/main/scala</sourceDirectory>
- <testSourceDirectory>src/test/scala</testSourceDirectory>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.2</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- <configuration>
- <args>
- <arg>-make:transitive</arg>
- <arg>-dependencyfile</arg>
- <arg>${project.build.directory}/.scala_dependencies</arg>
- </args>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
-
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.3</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <includes>
- <include>META-INF/*.SF</include>
- <include>META-INF/*.DSA</include>
- <include>META-INF/*.RSA</include>
- <include>META-INF/*.MF</include>
- </includes>
- <excludes>
- <exclude>META-INF/*</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>com.qf.spark.WordCount</mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。