当前位置:   article > 正文

kafka API练习_scala.actors.threadpool pom

scala.actors.threadpool pom

生产者层面

  1. package Kafka
  2. import java.util.Properties
  3. import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
  4. object KafkaProducerTest {
  5. def main(args: Array[String]): Unit = {
  6. //定义topic,把数据传到该topic
  7. val topic = "KafkaSimple3"
  8. //创建一个配置文件信息类
  9. val properties = new Properties()
  10. //数据在序列化的编码类型
  11. properties.put("serializer.class", "kafka.serializer.StringEncoder")
  12. //kafaka集群列表
  13. properties.put("metadata.broker.list", "192.168.42.132:9092,192.168.42.134:9092,192.168.42.135:9092")
  14. //设置发送数据后是否需要服务端的反馈:0,1,-1
  15. properties.put("request.required.acks", "1")
  16. //调用分区器
  17. properties.put("partitioner.class", "Kafka.CustomPartitioner")
  18. val config: ProducerConfig = new ProducerConfig(properties)
  19. //创建一个生产者实例vb
  20. val producer: Producer[String, String] = new Producer(config)
  21. //模拟生产一些数据
  22. for (i <- 1 to 10000) {
  23. val msg = s"$i: Producer send data"
  24. producer.send(new KeyedMessage[String, String](topic, msg))
  25. }
  26. }
  27. }

消费者层面

  1. package Kafka
  2. import java.util.Properties
  3. import kafka.consumer._
  4. import kafka.message.MessageAndMetadata
  5. import scala.actors.threadpool.{ExecutorService, Executors}
  6. import scala.collection.mutable
  7. class KafkaConsumer(val consumer: String, val stream: KafkaStream[Array[Byte], Array[Byte]]) extends Runnable {
  8. override def run(): Unit = {
  9. val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator()
  10. while (it.hasNext()) {
  11. val data: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
  12. val topic: String = data.topic
  13. val partition: Int = data.partition
  14. val offset: Long = data.offset
  15. val msg = new String(data.message())
  16. println(s"Consumer: $consumer, Topic: $topic, Partition: $partition, Offset: $offset, msg: $msg")
  17. }
  18. }
  19. }
  20. object KafkaConsumerTest {
  21. def main(args: Array[String]): Unit = {
  22. //定义用来读取数据的topic
  23. val topic = "KafkaSimple3"
  24. //用来存储多个topic
  25. val topics = new mutable.HashMap[String, Int]()
  26. topics.put(topic, 2)
  27. //配置文件信息
  28. val properties = new Properties()
  29. //ConsumerGroup id
  30. properties.put("group.id", "zwj-consumer-group")
  31. // 指定zookeeper的地址列表, 注意:value里不要有空格
  32. properties.put("zookeeper.connect", "192.168.42.132:2181,192.168.42.134:2181,192.168.42.135:2181")
  33. // 如果zookeeper没有offset值或者offset值超出范围,需要指定一个初始的offset
  34. properties.put("auto.offset.reset", "smallest")
  35. //把配置信息封装到ConsumerConfig对象里
  36. val config = new ConsumerConfig(properties)
  37. //创建Consumer实例,如果没有数据,会一直线程等待
  38. val consumer: ConsumerConnector = Consumer.create(config)
  39. //根据所传topics来获取数据,得到一个stream流
  40. val streams: collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = consumer.createMessageStreams(topics)
  41. //获取指定topic的数据
  42. val stream: Option[List[KafkaStream[Array[Byte], Array[Byte]]]] = streams.get(topic)
  43. //创建一个固定大小的线程池
  44. val pool: ExecutorService = Executors.newFixedThreadPool(3)
  45. for (i <- 0 until stream.size) {
  46. pool.execute(new KafkaConsumer(s"Consumer: $i", stream.get(i)))
  47. }
  48. }
  49. }

pom.xml(里面有比较多不必要的包是写之前的代码残留的)

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.zwj</groupId>
  7. <artifactId>ScalaTest</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>1.7</maven.compiler.source>
  11. <maven.compiler.target>1.7</maven.compiler.target>
  12. <encoding>UTF-8</encoding>
  13. <scala.version>2.10.6</scala.version>
  14. <spark.version>1.6.3</spark.version>
  15. <hadoop.version>2.6.4</hadoop.version>
  16. </properties>
  17. <dependencies>
  18. <dependency>
  19. <groupId>org.scala-lang</groupId>
  20. <artifactId>scala-library</artifactId>
  21. <version>${scala.version}</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.spark</groupId>
  25. <artifactId>spark-core_2.10</artifactId>
  26. <version>${spark.version}</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.apache.hadoop</groupId>
  30. <artifactId>hadoop-client</artifactId>
  31. <version>${hadoop.version}</version>
  32. </dependency>
  33. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  34. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  35. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  36. <dependency>
  37. <groupId>org.apache.kafka</groupId>
  38. <artifactId>kafka_2.10</artifactId>
  39. <version>0.9.0.1</version>
  40. </dependency>
  41. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
  42. <dependency>
  43. <groupId>org.apache.kafka</groupId>
  44. <artifactId>kafka-clients</artifactId>
  45. <version>0.9.0.1</version>
  46. </dependency>
  47. <!-- https://mvnrepository.com/artifact/log4j/log4j -->
  48. <dependency>
  49. <groupId>org.apache.logging.log4j</groupId>
  50. <artifactId>log4j-core</artifactId>
  51. <version>2.10.0</version>
  52. </dependency>
  53. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
  54. <dependency>
  55. <groupId>org.apache.kafka</groupId>
  56. <artifactId>kafka-streams</artifactId>
  57. <version>0.10.0.1</version>
  58. </dependency>
  59. <dependency>
  60. <groupId>org.apache.kafka</groupId>
  61. <artifactId>connect-api</artifactId>
  62. <version>0.10.0.1</version>
  63. </dependency>
  64. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
  65. <dependency>
  66. <groupId>org.apache.spark</groupId>
  67. <artifactId>spark-sql_2.10</artifactId>
  68. <version>1.6.3</version>
  69. </dependency>
  70. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  71. <dependency>
  72. <groupId>mysql</groupId>
  73. <artifactId>mysql-connector-java</artifactId>
  74. <version>5.1.46</version>
  75. </dependency>
  76. </dependencies>
  77. <build>
  78. <sourceDirectory>src/main/scala</sourceDirectory>
  79. <testSourceDirectory>src/test/scala</testSourceDirectory>
  80. <plugins>
  81. <plugin>
  82. <groupId>net.alchim31.maven</groupId>
  83. <artifactId>scala-maven-plugin</artifactId>
  84. <version>3.2.2</version>
  85. <executions>
  86. <execution>
  87. <goals>
  88. <goal>compile</goal>
  89. <goal>testCompile</goal>
  90. </goals>
  91. <configuration>
  92. <args>
  93. <arg>-make:transitive</arg>
  94. <arg>-dependencyfile</arg>
  95. <arg>${project.build.directory}/.scala_dependencies</arg>
  96. </args>
  97. </configuration>
  98. </execution>
  99. </executions>
  100. </plugin>
  101. <plugin>
  102. <groupId>org.apache.maven.plugins</groupId>
  103. <artifactId>maven-shade-plugin</artifactId>
  104. <version>2.4.3</version>
  105. <executions>
  106. <execution>
  107. <phase>package</phase>
  108. <goals>
  109. <goal>shade</goal>
  110. </goals>
  111. <configuration>
  112. <filters>
  113. <filter>
  114. <artifact>*:*</artifact>
  115. <includes>
  116. <include>META-INF/*.SF</include>
  117. <include>META-INF/*.DSA</include>
  118. <include>META-INF/*.RSA</include>
  119. <include>META-INF/*.MF</include>
  120. </includes>
  121. <excludes>
  122. <exclude>META-INF/*</exclude>
  123. </excludes>
  124. </filter>
  125. </filters>
  126. <transformers>
  127. <transformer
  128. implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  129. <mainClass>com.qf.spark.WordCount</mainClass>
  130. </transformer>
  131. </transformers>
  132. </configuration>
  133. </execution>
  134. </executions>
  135. </plugin>
  136. </plugins>
  137. </build>
  138. </project>

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/1017792
推荐阅读
相关标签
  

闽ICP备14008679号