赞
踩
在server.properties文件中配置kafka连接zookeeper相关信息
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
在zookeeper.properties中配置zookeeper所需配置
# 数据文件保存地址
dataDir=/tmp/zookeeper
# 客户端端口
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# 设置此功能端口将不在冲突
admin.enableServer=false
# admin.serverPort=8080
windows下载kafka二进制包到本机:http://kafka.apache.org/downloads 2、在config下面的server.properties文件,修改: listeners=PLAINTEXT://localhost:9092 log.dirs=F:\kafka_2.13-2.5.0\logs 3、在bin同级目录下打开shell窗口,启动kafka: .\bin\windows\kafka-server-start.bat .\config\server.properties 4、创建主题 查看可用主题 .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 5、删除指定topic .\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic topic_kedacom_icms_alarm_jy_3725 5.1、如果出现临时存储的topic需要到zookeeper删除指定的topic #查看存储的topic ls /brokers/topics #删除指定的topic rmr /brokers/topics/topicName 6、另起窗口,开启指定topic .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_kedacom_icms_alarm_jy_3725 7、另起窗口、开启生产端 .\bin\windows\kafka-console-producer.bat --broker-list 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_jy_3725 8、另起窗口,开启消费端 chcp 65001 .\bin\windows\kafka-console-consumer.bat --bootstrap-server 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_sj_3725 --from-beginning 如果遇到文本过长 指令识别错误,是因为存放目录过长不规范引起
#在选择版本,高版本会提示缺少anntnationprocess... <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>commons-httpclient</groupId> <artifactId>commons-httpclient</artifactId> <version>3.1</version> </dependency> </dependencies>
/** * @Auther: lyp * @Date: 2021/11/22 15:46 */ @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${bootstrap.servers}") private String bootstrapServers; public KafkaProducerConfig(){ System.out.println("kafka--------------------------------生产配置"); } /** * 创建生产值消息工厂 */ @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory(producerProperties()); } /** * 生产基本配置 */ @Bean public Map<String, Object> producerProperties() { Map<String, Object> props = new HashMap<String, Object>(); //设置kafka访问地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //消息转化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //重试次数 props.put(ProducerConfig.RETRIES_CONFIG,1); //分批处理内存设置 props.put(ProducerConfig.BATCH_SIZE_CONFIG,1048576); props.put(ProducerConfig.LINGER_MS_CONFIG,1); //使用内存配置 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432L); //确认标志符使用配置 props.put(ProducerConfig.ACKS_CONFIG,"all"); return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { KafkaTemplate kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory(),true); kafkaTemplate.setDefaultTopic(KafkaSendEnum.ALARM_WARN_PUSH.getTopic()); return kafkaTemplate; } }
package com.huating.jfp.msg.api.kafka.config; import com.huating.jfp.msg.api.kafka.construct.KafkaConsumerEnum; import com.huating.jfp.msg.api.kafka.listener.KafkaConsumerListener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; /** * @author lyp * @ClassName KafkaConsumerConfig * @description: 消费者配置 * @datetime 2022年 07月 20日 9:15 */ @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${bootstrap.servers}") private String bootstrapServers; public KafkaConsumerConfig() { System.out.println("kafka消费者配置加载..."); } public Map<String, Object> consumerProperties() { Map<String, Object> props = new HashMap<String, Object>(); //Kafka服务地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //消费组 props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerEnum.SD_SJ.getGroupId()); //关闭自动提交位移 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //设置间隔时间,默认5000ms props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); //Key反序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //Value反序列化 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区 下的数据 //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<String, String>(consumerProperties()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public KafkaConsumerListener kafkaConsumerListener() { return new KafkaConsumerListener(); } }
/** * @author lyp */ public class KafkaTopicUtil { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicUtil.class); /** * 功能描述:创建topic,并返回创建结果 * @param: topicName * @return: boolean * @auther: lyp * @date: 2021/11/12 16:06 */ public static boolean createTopics(String bootstrapServers,String topicName,int partitions,short replication) { boolean res = false; try { Properties properties = new Properties(); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"); AdminClient adminClient = KafkaAdminClient.create(properties); NewTopic newTopic = new NewTopic(topicName, partitions, replication); adminClient.createTopics(Arrays.asList(newTopic)); logger.info("创建Topic:"+topicName+"成功!"); res = true; } catch (Exception e) { e.printStackTrace(); logger.info("创建异常!"); } return res; } /** * 功能描述:获取当前kafka所存在的topic列表 * @return: set * @auther: lyp * @date: 2021/11/12 16:07 */ public static Set<String> getTopics(String bootstrapServers){ Set<String> nameSet = new HashSet<>(); try { Properties properties = new Properties(); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); AdminClient adminClient = KafkaAdminClient.create(properties); ListTopicsResult listTopicsResult = adminClient.listTopics(); KafkaFuture<Set<String>> names = listTopicsResult.names(); nameSet = names.get(); } catch (Exception e) { e.printStackTrace(); } return nameSet; } }
public interface KafkaProduceService {
/**设备报警消息发送*/
boolean sendWarnMessage(DeviceWarnInfo deviceWarnInfo);
}
/** * @author lyp */ @Service("kafkaProducerService") public class KafkaProducerServiceImpl implements KafkaProduceService { private static final Logger logger = LoggerFactory.getLogger(KafkaProduceService.class); @Value("${bootstrap.servers}") private String bootstrapServers; @Value("${topic.name}") private String topicName; @Value("${srcUnit.code}") private String srcUnitCode; @Value("${srcUnit.name}") private String srcUnitName; @Override public boolean sendWarnMessage(DeviceWarnInfo deviceWarnInfo) { boolean res = false; Map<String, Object> reportData = new HashMap<>(); reportData.put("command","reportAlarm"); deviceWarnInfo.setSrcUnitCode(srcUnitCode); deviceWarnInfo.setSrcUnitName(srcUnitName); reportData.put("data",deviceWarnInfo); //判断是否存在当前主题 Set<String> topics = KafkaTopicUtil.getTopics(bootstrapServers); if (!topics.contains(KafkaSendEnum.ALARM_WARN_PUSH.getTopic())){ if (!KafkaTopicUtil.createTopics(bootstrapServers,topicName,1,(short)1)){ logger.info("topic创建失败,消息发送不成功!"); return res; } } KafkaTemplate kafkaTemplate = SpringContextUtil.getBean("kafkaTemplate"); ListenableFuture send = kafkaTemplate.sendDefault(topicName, JSONArray.toJSONString(reportData)); send.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { logger.error(ex.getMessage()+"发送失败!原因:"+ex.getCause()); System.out.println("发送失败!"); } @Override public void onSuccess(Object result) { logger.info("消息发送成功"+result.toString()); System.out.println("发送成功!"); } }); return res; } }
package com.huating.jfp.msg.api.kafka.entity; /** * @author lyp * @ClassName MesBody * @description: 消息实体 * @datetime 2022年 07月 21日 14:48 */ @Data public class MesBody { //类型标记字段 private String command; //消息实体字段 private String data; }
package com.huating.jfp.msg.api.kafka.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import com.alibaba.fastjson.JSONObject; import com.huating.jfp.msg.api.kafka.construct.KafkaMesType; import com.huating.jfp.msg.api.kafka.construct.KafkaTopics; import com.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService; import com.huating.jfp.msg.api.kafka.entity.InspectorIssue; import com.huating.jfp.msg.api.kafka.entity.MesBody; import com.huating.jfp.msg.api.kafka.entity.Notice; /** * @author lyp * @ClassName KafkaConsumerListener * @description: 主题监听 * @datetime 2022年 07月 20日 9:27 */ public class KafkaConsumerListener { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class); @Autowired private KafkaConsumerService consumerService; /** * 功能描述: 监听指定topic,多个使用, * groupId:分组id * topics:监听当前topic数组 * topic:监听单个topic */ @KafkaListener(groupId = "${group.id}",topics = "#{'${consumer.topics}'.split(',')}",containerFactory = "") public void listener(ConsumerRecord<String, String> consumerRecord) { logger.info("开始消费" + KafkaTopics.SD_JY_DUTY_TOPIC.getTopicName() + "的消息{}", consumerRecord.value()); MesBody mesBody = JSONObject.parseObject(consumerRecord.value(), MesBody.class); logger.error("kafka监听-当前消息类型:"+mesBody.getCommand()); //督查督办 if (mesBody.getCommand().equals(KafkaMesType.SD_INSPECTOR_ISSUE.getMesCode()) || mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_DISPOSE.getMesCode()) || mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_RES.getMesCode())) { logger.error("督查督办监听消息处理开始----->----->"); InspectorIssue inspectorIssue = JSONObject.parseObject(mesBody.getData(), InspectorIssue.class); consumerService.inspectorListener(inspectorIssue); } //通知通报 if (mesBody.getCommand().equals(KafkaMesType.SD_NOTICE_ISSUE.getMesCode())) { logger.error("通知通报开始监听"); Notice notice = JSONObject.parseObject(mesBody.getData(), Notice.class); consumerService.noticeListener(notice); } } }
package com.huating.jfp.msg.api.kafka.consumer.service; import com.huating.jfp.msg.api.kafka.entity.InspectorIssue; import com.huating.jfp.msg.api.kafka.entity.Notice; /** * @author lyp */ public interface KafkaConsumerService { /** * 功能描述: 督查下发 督查办结监听处理 * * @param inspectorIssue */ void inspectorListener(InspectorIssue inspectorIssue); /** * 功能描述: 通知通报下发监听 * * @param notice */ void noticeListener(Notice notice); }
package com.huating.jfp.msg.api.kafka.consumer.service.impl; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil; import com.huating.jfp.common.dao.MsgConfigureDao; import com.huating.jfp.common.dao.MsgDao; import com.huating.jfp.common.entity.Msg; import com.huating.jfp.common.entity.MsgConfigure; import com.huating.jfp.common.entity.MsgReceive; import com.huating.jfp.common.service.MsgReceiveService; import com.huating.jfp.core.base.ViewPublicRewrite; import com.huating.jfp.msg.api.http.servcie.HttpRequestService; import com.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService; import com.huating.jfp.msg.api.kafka.dao.InspectorEventMapper; import com.huating.jfp.msg.api.kafka.dao.NoticeMapper; import com.huating.jfp.msg.api.kafka.entity.*; import com.huating.jfp.msg.api.kafka.producer.service.KafkaProducerService; import com.huating.jfp.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * @author lyp * @ClassName KafkaConsumerServiceImpl * @description: 消费实现 * @datetime 2022年 07月 20日 9:13 */ @Service public class KafkaConsumerServiceImpl implements KafkaConsumerService { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class); private static final String SRC_UNIT_CODE = "gaol_code"; private static final String SRC_UNIT_NAME = "gaol_name"; @Autowired private InspectorEventMapper inspectorEventMapper; @Autowired private HttpRequestService httpRequestService; @Autowired private NoticeMapper noticeMapper; @Autowired private MsgDao msgMapper; @Autowired private MsgConfigureDao msgConfigureMapper; @Autowired private MsgReceiveService msgReceiveService; @Autowired private ViewPublicRewrite vp; @Override public void inspectorListener(InspectorIssue inspectorIssue) { if (!StrUtil.isEmpty(inspectorIssue.getUuid())) { if (!StrUtil.isEmpty(inspectorIssue.getDubanTime())) { logger.error("督办下发处理"); InspectorEventDispose inspectorEventDispose = new InspectorEventDispose(); //督查督办 String uuid = StringUtil.getUUID(); inspectorEventDispose.setIedUuid(uuid); inspectorEventDispose.setIedIeUuid(inspectorIssue.getUuid()); inspectorEventDispose.setIedExpireTime(inspectorIssue.getDubanTime()); inspectorEventDispose.setIedContent(inspectorIssue.getContent()); //督办下发持久化 inspectorEventMapper.insertDispose(inspectorEventDispose); logger.error("督办下发数据新增完成"); //督办文件持久化 List<FileEntity> files = inspectorIssue.getFiles(); List<InspectorFile> fileList = new ArrayList<>(); downloadFile(files, fileList, uuid); inspectorEventMapper.insertFiles(fileList); logger.error("督办下发完成"); } else if (!StrUtil.isEmpty(inspectorIssue.getSrcUnitCode())) { logger.error("督查下发处理"); InspectorEvent inspectorEvent = new InspectorEvent(); //督查下发 inspectorEvent.setIeUuid(inspectorIssue.getUuid()); inspectorEvent.setIeAreaCode(vp.getBusinessValue("gaol_code")); inspectorEvent.setIeEventType(inspectorIssue.getType()); inspectorEvent.setIeDescribe(inspectorIssue.getContent()); inspectorEvent.setIeGrabTime(inspectorIssue.getPublishTime()); inspectorEvent.setIeExpireTime(inspectorIssue.getQxTime()); inspectorEvent.setIeCusNunmber(vp.getBusinessValue("base_cus")); inspectorEvent.setIeNature(inspectorIssue.getNature()); inspectorEvent.setIeIsSj(0); //督查下发持久化 inspectorEventMapper.insertSynData(inspectorEvent); logger.error("督查下发数据新增成功"); //督查文件持久化 List<FileEntity> files = inspectorIssue.getFiles(); List<InspectorFile> fileList = new ArrayList<>(); downloadFile(files, fileList, inspectorIssue.getUuid()); inspectorEventMapper.insertFiles(fileList); logger.error("督查文件数据新增成功"); logger.error("督查下发完成"); } else { //督查办结 if (inspectorEventMapper.searchIsSj(inspectorIssue.getUuid()) > 0) { //修改督查状态为办结 inspectorEventMapper.updateState("3", inspectorIssue.getUuid()); logger.error("督查办结完成"); } } } } @Override public void noticeListener(Notice notice) { logger.error("通知通报下发开始处理"); //通知通报持久化 noticeMapper.insertData(notice); Msg msg = new Msg(); String uuid = StringUtil.getUUID(); msg.setMUuid(uuid); MsgConfigure msgConfigure = new MsgConfigure(); msgConfigure.setMcCode("NOTIC_ISSUE"); MsgConfigure config = msgConfigureMapper.selectByData(msgConfigure).get(0); msg.setMcUuid(config.getMcUuid()); msg.setMcMsglevel(config.getMcMsglevel()); msg.setMStatus(Byte.parseByte(notice.getFeedback() == 0 ? "1" : "0")); msg.setMParam(notice.getUuid()); msg.setMContent(notice.getTitle()); msg.setCreateTime(new Date()); if (notice.getFeedback() == 0) { msg.setMHandleTime(new Date()); msg.setMHandleUser("当前通知通报无需处置"); } msgMapper.insertMsg(msg); MsgReceive msgReceive = new MsgReceive(); msgReceive.setMrUuid(StringUtil.getUUID()); msgReceive.setmUuid(uuid); msgReceiveService.insertMsgReceive(msgReceive); //文件持久化 List<FileEntity> files = notice.getFiles(); noticeDownloadFile(files, notice.getUuid()); noticeMapper.insertFiles(files); } private void downloadFile(List<FileEntity> files, List<InspectorFile> fileList, String uuid) { logger.error("文件下载开始"); if (!files.isEmpty()) { for (FileEntity file : files) { InspectorFile inspectorFile = new InspectorFile(); String fileName = file.getFileName(); logger.error(fileName); inspectorFile.setIfFileName(fileName); String last = fileName.substring(fileName.lastIndexOf(".")); if (last.equals(".jpg") || last.equals(".JPG") || last.equals(".png") || last.equals(".gif") || last.equals(".bmp")) { inspectorFile.setIfFileType(1); } else { inspectorFile.setIfFileType(2); } inspectorFile.setIfSourceType(1); inspectorFile.setIfIeUuid(uuid); //需要确定省局的其他类型文件详情 inspectorFile.setIfPath(file.getFileName()); String fileId = file.getFileId(); //文件下载 String token = httpRequestService.sendPostMessage(); boolean res = httpRequestService.downloadFile(fileId, token, vp.getCusBusinessValue("duty_file_disk_mapping_path", "1000") + "/dutyUpLoad/"); if (res) { fileList.add(inspectorFile); } } } } private void noticeDownloadFile(List<FileEntity> files, String uuid) { files.stream().forEach((file) -> { file.setParentId(uuid); String token = httpRequestService.sendPostMessage(); httpRequestService.downloadFile(file.getFileId(), token, vp.getBusinessValue("notice_file_disk_mapping_path") + "/noticeUpLoad/"); }); } public boolean checkSj() { return vp.getBusinessValue("is_sj") != null && Boolean.parseBoolean(vp.getBusinessValue("is_sj")); } }
kafka消息发送方式有同步、异步和ONEWAY三种方式,producer.type参数指定同步或者异步,request.require.acks指定ONEWAY。
producer.type=sync默认同步
设置异步需配套配置
Property | Default | Description |
---|---|---|
queue.buffering.max.ms | 5000 | 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。 |
queue.buffering.max.messages | 10000 | 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。 |
queue.enqueue.timeout.ms | -1 | 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。 |
batch.num.messages | 200 | 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量) |
以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。
在代码中如果需要同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。
kafkaTemplate.send().get("key",value);
异步发送只需要在发送成功获取消息是否成功即可:
ListenableFuture future = kafkaTemplate.send();
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
logger.error(ex.getMessage()+"发送失败!原因:"+ex.getCause());
}
@Override
public void onSuccess(Object result) {
logger.info("消息发送成功"+result.toString());
}
});
消息可靠性
producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。