赞
踩
一,下载confluent安装包,解压到linux目录,进入etc/kafka-rest/kafka-rest.properties
kafka-rest.properties配置
id=kafka-rest-test-server
schema.registry.url=http://192.168.237.136:8081
zookeeper.connect=192.168.237.136:2181
bootstrap.servers=PLAINTEXT://192.168.237.136:9092
二,进入etc/schema-registry/schema-registry.properties
schema-registry.properties配置
listeners=http://192.168.237.136:8081
kafkastore.connection.url=192.168.237.136:2181
kafkastore.topic=_schemas01
debug=false
三,注册schema服务
curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“schema”:"{“type”: “record”, “name”: “myrecord”, “fields”: [{“name”: “id”, “type”: “int”},{“name”: “name”, “type”: “string”}]}
"}’
http://192.168.0.102:8081/subjects/ms02/versions
其中ms02为主题名
查看所有已注册的schema
终端输入 :curl -X GET http://192.168.237.136:8081/subjects或者浏览器直接访问 http://192.168.237.136:8081/subjects
查看某一个主题对应的注册schema
curl -X GET http://192.168.0.102:8081/subjects/topics/versions
删除已经注册的schema
curl -X DELETE http://192.168.0.102:8081/subjects/topics/versions/1
其中1为版本号,要删哪个把1换为那个就好。
package com.hrtj.service; /* * 注册了两个服务 * */ import java.util.Date; import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.stereotype.Service; @Service public class AvroProducer { public static final String STU_SCHEMA1="{"+ "\"type\": \"record\","+ "\"name\": \"myrecord\","+ "\"fields\":"+ "["+ "{\"name\": \"id\", \"type\": \"int\"},"+ "{\"name\":\"name\",\"type\":\"string\"}"+ "]"+ "}"; public static final String STU_SCHEMA2="{"+ "\"type\": \"record\","+ "\"name\": \"myrecord\","+ "\"fields\":"+ "["+ "{\"name\": \"id\", \"type\": \"int\"},"+ "{\"name\":\"name\",\"type\":\"string\"},"+ "{\"name\":\"age\",\"type\":\"int\"}"+ "]"+ "}"; private static final String bird_schema = "{\"type\":\"record\",\"name\":\"bird4\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}," +"{\"name\":\"timeid\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}}]}"; public void sends1() throws Exception{ Properties p = new Properties(); p.put("bootstrap.servers", "192.168.0.102:9092"); p.put(ProducerConfig.ACKS_CONFIG, "all"); p.put(ProducerConfig.RETRIES_CONFIG, 0); p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); p.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer"); p.put("schema.registry.url", "http://192.168.0.102:8081"); Parser parser = new Schema.Parser(); Schema schema1 = parser.parse(STU_SCHEMA1); @SuppressWarnings("resource") Producer<String, GenericRecord> producer = new KafkaProducer<>(p); for(int i=1;i<20000;i++){ Record r1 = new GenericData.Record(schema1); r1.put("id", i); r1.put("name", "studt"); ProducerRecord<String,GenericRecord> record1= new ProducerRecord<String,GenericRecord>("hr02","key",r1); producer.send(record1); } } public void sends2() throws Exception{ Properties p = new Properties(); p.put("bootstrap.servers", "hadoop4:9092"); p.put(ProducerConfig.ACKS_CONFIG, "all"); p.put(ProducerConfig.RETRIES_CONFIG, 0); p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); p.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer"); p.put("schema.registry.url", "http://192.168.237.136:8081"); Parser parser = new Schema.Parser(); Schema schema2 = parser.parse(STU_SCHEMA2); @SuppressWarnings("resource") Producer<String, GenericRecord> producer = new KafkaProducer<>(p); for(int i=1;i<900;i++){ Record r2 = new GenericData.Record(schema2); r2.put("id", (long)i); r2.put("name", "stud"); r2.put("age", 19+i); ProducerRecord<String,GenericRecord> record2 = new ProducerRecord<String,GenericRecord>("ms01","key"+i,r2); producer.send(record2,new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("acks"); } }); Thread.sleep(3000); } } public void sends3() throws Exception{ Properties p = new Properties(); p.put("bootstrap.servers", "hadoop4:9092"); p.put(ProducerConfig.ACKS_CONFIG, "all"); p.put(ProducerConfig.RETRIES_CONFIG, 0); p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); p.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer"); p.put("schema.registry.url", "http://192.168.237.136:8081"); Parser parser = new Schema.Parser(); Schema schema3 = parser.parse(bird_schema); @SuppressWarnings("resource") Producer<String, GenericRecord> producer = new KafkaProducer<>(p); for(int i=1;i<20;i++){ Record r3 = new GenericData.Record(schema3); r3.put("id", i); r3.put("name", "stud"); r3.put("age", 19+i); Date date = new Date(); r3.put("timeid", date); ProducerRecord<String,GenericRecord> record3 = new ProducerRecord<String,GenericRecord>("test-mysql-bird",r3); producer.send(record3); Thread.sleep(100); } } }
通过生产者把数据以对象的格式打进topic,通过ksql可以使用类sql语句查询。
package com.hrtj.service; /* * 从topic消费数据 * */ import java.util.Collections; import java.util.Properties; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.stereotype.Service; @Service public class AvroConsumer { public void receive(){ Properties p = new Properties(); p.put("bootstrap.servers", "hadoop4:9092"); p.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 使用Confluent实现的KafkaAvroDeserializer p.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); p.put("group.id", "schema_test"); p.put("schema.registry.url", "http://192.168.237.136:8081"); p.put("auto.offset.reset", "earliest"); KafkaConsumer<String,GenericRecord> consumer = new KafkaConsumer<String,GenericRecord>(p); consumer.subscribe(Collections.singleton("schema03")); try{ while(true){ ConsumerRecords<String,GenericRecord> records = consumer.poll(1000); for (ConsumerRecord<String,GenericRecord> r : records) { GenericRecord stu = r.value(); System.out.println(r.key()+" : "+stu.get("id")+"\t"+stu.get("name")+"\t"+stu.get("age")); } } }catch(Exception e){ e.printStackTrace(); }finally{ System.out.println("AvroConsumer.receive()"); consumer.close(); } } }
通过record的value()获取对象,进而获取对象的每个属性值。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。