赞
踩
进入kafka目录,进入bin
列出所有topic
./kafka-topics.sh --list --zookeeper x1:2181,x2:2181,x3:2181
./kafka-topics.sh --zookeeper x1:2181,x2:2181,x3:2181 --create --topic test --partitions 20 --replication-factor 1
./kafka-console-producer.sh --broker-list x1:9092,x2:9092,x3:9092--topic test
./kafka-console-consumer.sh --zookeeper x1:2181,x2:2181,x3:2181 --topic test--from-beginning
./kafka-topics.sh --zookeeper x1:2181,x2:2181,x3:2181 --delete --topic test
生产者代码如下:
pom配置文件
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.1</artifactId>
- <version>0.8.1.1</version>
- <scope>compile</scope>
- <exclusions>
- <exclusion>
- <artifactId>jmxri</artifactId>
- <groupId>com.sun.jmx</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jms</artifactId>
- <groupId>javax.jms</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jmxtools</artifactId>
- <groupId>com.sun.jdmk</groupId>
- </exclusion>
- </exclusions>
- </dependency>
代码:
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
-
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
-
- /**
- */
- public class DaceKafkaProducer implements DaceProducer
- {
- // private final Producer<String, String> producer;
-
- private Producer<String, String> producer;
- public DaceKafkaProducer(){
- Properties props = new Properties();
- //此处配置的是kafka的端口
- // props.put("metadata.broker.list", "x1:9092,x2:9092,x3:9092");
- // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "n1:9092,n2:9092,n3:9092");
- props.put("zk_connect", "n1:2181,n2:2181,n3:2181");
- props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
- // props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10);
- // //配置value的序列化类
- // props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- // //配置key的序列化类
- // props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- // props.put("request.required.acks","1");
- props.put("log.retention.minutes", 1);
- //
- // producer = new KafkaProducer<String, String>(props);
-
- props.put("metadata.broker.list", "x1:9092,x2:9092,x3:9092");
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("log.retention.minutes", 1);
- props.put("request.required.acks", "1");
- ProducerConfig config = new ProducerConfig(props);
-
- producer = new Producer<String, String>(config);
- }
-
- void produce(String topic, String tableName, List<String> datas) {
- List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String,String>>();
- for (String data:datas) {
- messages.add(new KeyedMessage<String, String>(topic, tableName, data));
- }
- producer.send(messages);
- }
-
- public static void main(String[] args) {
- // DaceKafkaProducer producer = new DaceKafkaProducer();
- // for (int i = 0; i < 15; i++) {
- // producer.produce("replay", "test", "testlength" + i);
- // }
- }
生产消息
- ./producer-console.sh --broker-list 0:x1:9092,1:x2:9092,2:x3:9092 --topic demo
-
-
- ./producer-console.sh --zookeeper x2:2181/jafka --topic demo
-
-
- ./consumer-console.sh --zookeeper x2:2181/jafka --topic demo --from-beginning
pom配置:
- <dependency>
- <groupId>com.sohu.jafka</groupId>
- <artifactId>jafka</artifactId>
- <version>1.2.2</version>
- </dependency>
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
-
-
-
-
-
- import com.sohu.jafka.producer.Producer;
- import com.sohu.jafka.producer.ProducerConfig;
- import com.sohu.jafka.producer.ProducerData;
- import com.sohu.jafka.producer.serializer.StringEncoder;
-
- public class DaceJfkaProducer implements DaceProducer{
-
- private final Producer<String, String> producer;
-
- public DaceJfkaProducer() {
- Properties props = new Properties();
- // props.put("broker.list", "0:hadoop001.local:9092,1:hadoop002.local:9092,2:hadoop003.local:9092");
- props.put("zk.connect", "n2:2181/jafka");
- props.put("log.retention.minutes", 1);
- props.put("serializer.class", StringEncoder.class.getName());
- producer = new Producer<String, String>(new ProducerConfig(props));
- }
-
- public void produce(String topic, List<String> data) {
- producer.send(new ProducerData<String, String>(topic, data));
- }
-
- public static void main(String[] args) {
- DaceJfkaProducer producer = new DaceJfkaProducer();
- List<String> data = new ArrayList<String>();
- for (int i = 0; i < 10; i++) {
- data.add("test1" + i);
- }
- producer.produce("demo", data);
- }
- }
kafka流程如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。