当前位置:   article > 正文

kafka及jafka命令及生产者代码实现_kafka 生产者命令

kafka 生产者命令

kafka:

进入kafka目录,进入bin

列出所有topic

 ./kafka-topics.sh --list --zookeeper x1:2181,x2:2181,x3:2181


 
 创建topic
 ./kafka-topics.sh --zookeeper  x1:2181,x2:2181,x3:2181 --create --topic test --partitions 20 --replication-factor 1


发送消息
 ./kafka-console-producer.sh --broker-list  x1:9092,x2:9092,x3:9092--topic test

消费消息
 ./kafka-console-consumer.sh --zookeeper  x1:2181,x2:2181,x3:2181 --topic test--from-beginning

 
 删除消息(必须在server.properties里配置delete.topic.enable=true)
 ./kafka-topics.sh --zookeeper x1:2181,x2:2181,x3:2181 --delete --topic test

生产者代码如下:

pom配置文件

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka_2.9.1</artifactId>
  4. <version>0.8.1.1</version>
  5. <scope>compile</scope>
  6. <exclusions>
  7. <exclusion>
  8. <artifactId>jmxri</artifactId>
  9. <groupId>com.sun.jmx</groupId>
  10. </exclusion>
  11. <exclusion>
  12. <artifactId>jms</artifactId>
  13. <groupId>javax.jms</groupId>
  14. </exclusion>
  15. <exclusion>
  16. <artifactId>jmxtools</artifactId>
  17. <groupId>com.sun.jdmk</groupId>
  18. </exclusion>
  19. </exclusions>
  20. </dependency>
代码:

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.Properties;
  4. import kafka.javaapi.producer.Producer;
  5. import kafka.producer.KeyedMessage;
  6. import kafka.producer.ProducerConfig;
  7. /**
  8. */
  9. public class DaceKafkaProducer implements DaceProducer
  10. {
  11. // private final Producer<String, String> producer;
  12. private Producer<String, String> producer;
  13. public DaceKafkaProducer(){
  14. Properties props = new Properties();
  15. //此处配置的是kafka的端口
  16. // props.put("metadata.broker.list", "x1:9092,x2:9092,x3:9092");
  17. // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "n1:9092,n2:9092,n3:9092");
  18. props.put("zk_connect", "n1:2181,n2:2181,n3:2181");
  19. props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
  20. // props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10);
  21. // //配置value的序列化类
  22. // props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  23. // //配置key的序列化类
  24. // props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  25. // props.put("request.required.acks","1");
  26. props.put("log.retention.minutes", 1);
  27. //
  28. // producer = new KafkaProducer<String, String>(props);
  29. props.put("metadata.broker.list", "x1:9092,x2:9092,x3:9092");
  30. props.put("serializer.class", "kafka.serializer.StringEncoder");
  31. props.put("log.retention.minutes", 1);
  32. props.put("request.required.acks", "1");
  33. ProducerConfig config = new ProducerConfig(props);
  34. producer = new Producer<String, String>(config);
  35. }
  36. void produce(String topic, String tableName, List<String> datas) {
  37. List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String,String>>();
  38. for (String data:datas) {
  39. messages.add(new KeyedMessage<String, String>(topic, tableName, data));
  40. }
  41. producer.send(messages);
  42. }
  43. public static void main(String[] args) {
  44. // DaceKafkaProducer producer = new DaceKafkaProducer();
  45. // for (int i = 0; i < 15; i++) {
  46. // producer.produce("replay", "test", "testlength" + i);
  47. // }
  48. }


jafka:

生产消息

  1. ./producer-console.sh --broker-list 0:x1:9092,1:x2:9092,2:x3:9092 --topic demo
  2. ./producer-console.sh --zookeeper x2:2181/jafka --topic demo
  3. ./consumer-console.sh --zookeeper x2:2181/jafka --topic demo --from-beginning


pom配置:

  1. <dependency>
  2. <groupId>com.sohu.jafka</groupId>
  3. <artifactId>jafka</artifactId>
  4. <version>1.2.2</version>
  5. </dependency>

代码:

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.Properties;
  4. import com.sohu.jafka.producer.Producer;
  5. import com.sohu.jafka.producer.ProducerConfig;
  6. import com.sohu.jafka.producer.ProducerData;
  7. import com.sohu.jafka.producer.serializer.StringEncoder;
  8. public class DaceJfkaProducer implements DaceProducer{
  9. private final Producer<String, String> producer;
  10. public DaceJfkaProducer() {
  11. Properties props = new Properties();
  12. // props.put("broker.list", "0:hadoop001.local:9092,1:hadoop002.local:9092,2:hadoop003.local:9092");
  13. props.put("zk.connect", "n2:2181/jafka");
  14. props.put("log.retention.minutes", 1);
  15. props.put("serializer.class", StringEncoder.class.getName());
  16. producer = new Producer<String, String>(new ProducerConfig(props));
  17. }
  18. public void produce(String topic, List<String> data) {
  19. producer.send(new ProducerData<String, String>(topic, data));
  20. }
  21. public static void main(String[] args) {
  22. DaceJfkaProducer producer = new DaceJfkaProducer();
  23. List<String> data = new ArrayList<String>();
  24. for (int i = 0; i < 10; i++) {
  25. data.add("test1" + i);
  26. }
  27. producer.produce("demo", data);
  28. }
  29. }

kafka流程如下:



声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/759272
推荐阅读
相关标签
  

闽ICP备14008679号