当前位置:   article > 正文

kafka-rest和schema registry服务注册_schema.registry.url

schema.registry.url

配置

一,下载confluent安装包,解压到linux目录,进入etc/kafka-rest/kafka-rest.properties
kafka-rest.properties配置
id=kafka-rest-test-server
schema.registry.url=http://192.168.237.136:8081
zookeeper.connect=192.168.237.136:2181
bootstrap.servers=PLAINTEXT://192.168.237.136:9092
二,进入etc/schema-registry/schema-registry.properties
schema-registry.properties配置
listeners=http://192.168.237.136:8081
kafkastore.connection.url=192.168.237.136:2181
kafkastore.topic=_schemas01
debug=false

注册服务

三,注册schema服务
curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“schema”:"{“type”: “record”, “name”: “myrecord”, “fields”: [{“name”: “id”, “type”: “int”},{“name”: “name”, “type”: “string”}]}
"}’
http://192.168.0.102:8081/subjects/ms02/versions
其中ms02为主题名
查看所有已注册的schema
终端输入 :curl -X GET http://192.168.237.136:8081/subjects或者浏览器直接访问 http://192.168.237.136:8081/subjects
查看某一个主题对应的注册schema
curl -X GET http://192.168.0.102:8081/subjects/topics/versions
删除已经注册的schema
curl -X DELETE http://192.168.0.102:8081/subjects/topics/versions/1
其中1为版本号,要删哪个把1换为那个就好。

代码

package com.hrtj.service;
/*
 * 注册了两个服务
 * */
import java.util.Date;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Service;

@Service
public class AvroProducer {

	public static final String STU_SCHEMA1="{"+
			"\"type\": \"record\","+
			"\"name\": \"myrecord\","+
			"\"fields\":"+
				"["+
					"{\"name\": \"id\", \"type\": \"int\"},"+
					"{\"name\":\"name\",\"type\":\"string\"}"+
				"]"+
		"}";
	public static final String STU_SCHEMA2="{"+
			"\"type\": \"record\","+
			"\"name\": \"myrecord\","+
			"\"fields\":"+
				"["+
					"{\"name\": \"id\", \"type\": \"int\"},"+
					"{\"name\":\"name\",\"type\":\"string\"},"+
					"{\"name\":\"age\",\"type\":\"int\"}"+
				"]"+
		"}";
	
	private static final String bird_schema = "{\"type\":\"record\",\"name\":\"bird4\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},"
	+"{\"name\":\"timeid\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}}]}";
	
	public void sends1() throws Exception{
		Properties p = new Properties();
		p.put("bootstrap.servers", "192.168.0.102:9092");
		p.put(ProducerConfig.ACKS_CONFIG, "all");
		p.put(ProducerConfig.RETRIES_CONFIG, 0);
		p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        p.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
		p.put("schema.registry.url", "http://192.168.0.102:8081");
		
		
		Parser parser = new Schema.Parser();
		Schema schema1 = parser.parse(STU_SCHEMA1);
		
		@SuppressWarnings("resource")
		Producer<String, GenericRecord> producer = new KafkaProducer<>(p);
		for(int i=1;i<20000;i++){
			Record r1 = new GenericData.Record(schema1);
			r1.put("id", i);
			r1.put("name", "studt");
			
			ProducerRecord<String,GenericRecord> record1= new ProducerRecord<String,GenericRecord>("hr02","key",r1);
			producer.send(record1);
		}
	}
	
	public void sends2() throws Exception{
		Properties p = new Properties();
		p.put("bootstrap.servers", "hadoop4:9092");
		p.put(ProducerConfig.ACKS_CONFIG, "all");
		p.put(ProducerConfig.RETRIES_CONFIG, 0);
		p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        p.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
		p.put("schema.registry.url", "http://192.168.237.136:8081");
		Parser parser = new Schema.Parser();
		Schema schema2 = parser.parse(STU_SCHEMA2);
		@SuppressWarnings("resource")
		Producer<String, GenericRecord> producer = new KafkaProducer<>(p);
		for(int i=1;i<900;i++){
			Record r2 = new GenericData.Record(schema2);
			r2.put("id", (long)i);
			r2.put("name", "stud");
			r2.put("age", 19+i);
			ProducerRecord<String,GenericRecord> record2 = new ProducerRecord<String,GenericRecord>("ms01","key"+i,r2);
			producer.send(record2,new Callback() {
				@Override
				public void onCompletion(RecordMetadata metadata, Exception exception) {
					System.out.println("acks");
				}
			});
			Thread.sleep(3000);
			
		}
	}
	
	public void sends3() throws Exception{
		Properties p = new Properties();
		p.put("bootstrap.servers", "hadoop4:9092");
		p.put(ProducerConfig.ACKS_CONFIG, "all");
		p.put(ProducerConfig.RETRIES_CONFIG, 0);
		p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        p.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
		p.put("schema.registry.url", "http://192.168.237.136:8081");
		Parser parser = new Schema.Parser();
		Schema schema3 = parser.parse(bird_schema);
		@SuppressWarnings("resource")
		Producer<String, GenericRecord> producer = new KafkaProducer<>(p);
		for(int i=1;i<20;i++){
			Record r3 = new GenericData.Record(schema3);
			r3.put("id", i);
			r3.put("name", "stud");
			r3.put("age", 19+i);
			Date date = new Date();
			r3.put("timeid", date);
			ProducerRecord<String,GenericRecord> record3 = new ProducerRecord<String,GenericRecord>("test-mysql-bird",r3);
			producer.send(record3);
			Thread.sleep(100);
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124

通过生产者把数据以对象的格式打进topic,通过ksql可以使用类sql语句查询。

package com.hrtj.service;
/*
 * 从topic消费数据
 * */
import java.util.Collections;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Service;

@Service
public class AvroConsumer {
	public void receive(){
		Properties p = new Properties();
		p.put("bootstrap.servers", "hadoop4:9092");
		p.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 使用Confluent实现的KafkaAvroDeserializer
        p.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
		p.put("group.id", "schema_test");
		p.put("schema.registry.url", "http://192.168.237.136:8081");
		p.put("auto.offset.reset", "earliest");
		
		KafkaConsumer<String,GenericRecord> consumer = new KafkaConsumer<String,GenericRecord>(p);
		consumer.subscribe(Collections.singleton("schema03"));
		
		try{
			while(true){
				ConsumerRecords<String,GenericRecord> records = consumer.poll(1000);
				for (ConsumerRecord<String,GenericRecord> r : records) {
					GenericRecord stu = r.value();
					System.out.println(r.key()+" : "+stu.get("id")+"\t"+stu.get("name")+"\t"+stu.get("age"));
				}
			}			
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			System.out.println("AvroConsumer.receive()");
			consumer.close();
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

通过record的value()获取对象,进而获取对象的每个属性值。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/79815
推荐阅读
相关标签
  

闽ICP备14008679号