赞
踩
关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
功能项 | Kafka(1.1.0版本) | RabbitMQ(3.6.10版本) | RocketMQ(4.2.0) |
---|---|---|---|
单机吞吐量 | 10w级,支持高吞吐 | 万级 | 10w级 |
消息可靠性 | 经过参数优化配置,做到0丢失 | 基本不丢 | 经过参数优化配置,做到0丢失 |
优先级队列 | 不支持 | 支持,建议优先级大小设置在0-10之间 | 支持,维护不同的优先级队列,根据message的优先级发送到对于的队列中 |
延迟队列 | 不支持 | 支持 | 支持 |
死信队列 | 不支持 | 支持 | 支持 |
重试队列 | 不支持 | 支持 | 支持 |
消费模式 | pull + push | pull + push | pull+长轮询 |
广播消费 | 支持,kafka对于广播消息的支持更加正统 | 支持 | 支持 |
批量消息 | 支持,生产者异步发送 | 支持 | 支持,生产者同步发送 |
消息回溯 | 支持.Kakfa支持按照offset和timestamp两种维度进行消息回溯 | 不支持,一旦被确认消费就会被删除 | 支持 |
消息堆积 | 支持,海量消息堆积,堆积能力和磁盘大小挂钩 | 支持,但是内存达到阀值时,性能会受影响 | 支持海量消息堆积 |
持久化方式 | 消息队列,segment方式 | 支持 | 消息队列 |
消息追踪 | 不支持。消息追踪可以通过外部系统来支持,但是支持粒度没有内置的细腻。 | 支持 | 支持 |
消息过滤 | 客户端级别的支持,可用过kafka stream进行消息过滤 | 不支持,但是二次封装一下也较简单 | 支持,可通过message tag、属性进行过滤 |
多租户 | 不支持 | 支持 | 不支持 |
多协议支持 | 只支持定义协议,目前几个主流版本之间存在兼容性问题 | AMQP | JMS ,OpenMessaging |
跨语言支持 | 当前版本采用Java编写,支持多种语言的客户端 | 采用Erlang编写,支持多种语言客户端 | Java, C++, Go |
流量控制 | 支持client和user级别,通过主动设置可将流控作用于生产者或消费者 | 流量控制基于credit-base算法,是内部被动触发的保护机制,作用于生产者层面 | RocketMQ提供了针对于不同维度的流量控制 |
消息顺序性 | 支持普通的顺序消息,即对于单个分区的消息发送和消费是有序的,但是不保证不重复 | 顺序性的条件比较苛刻,需要单线程发送,单线程消费并且不采用延迟队列、优先级队列等一些高级功能,从某种意义上来书不支持顺序性 | 支持普通的顺序消息和严格的顺序消息 |
幂等性 | 支持单个Producer单个分区的会话幂等性 | 不支持 | 不支持,不解决消息的重复问题 |
事务性消息 | 最新版支持事务消息 | 支持 | 最新版的metaq支持事务消息 |
高可用和容错 | 使用partition的副本机制和isr选举机制保证高可用 | 普通集群非高可用,可用镜像模式和主备集群 | 通过broker的master和slave实现高可用 |
定时消息 | 不支持 | 支持 | 支持 |
负载均衡 | 客户端消费者负载均衡,需要一个broker作为coordinator | 默认是轮询 | 客户端负载均衡,支持平均和轮询分配 |
刷盘策略 | 默认是异步刷盘 | 默认是内存存储 | 默认同步刷盘 |
Kafka是一种高吞吐量的分布式发布订阅消息系统,是观察者模式的一种实现。
Kafka是一个分布式消息队列。
两个需要通信的系统中,如果数据生产者和消费者之间的速度差异大,可以通过kafka作为中间件,削峰。
向Kafka集群(Broker)中一个或多个topic push消息;
逻辑上的一个订阅者。
consumer数量 = 分区数
。承载消息的逻辑容器,本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成。
num.partitions=3
配置;一个有序不变的消息队列,每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件。
default.replication.factor=N
。每个partition中的每条消息都会被分配一个offset,单调递增且不变。
offset.kafka
来命名;00000000000.kafka
。已发布的消息保存在一组服务器中,称之为Kafka集群;每个节点的kafka实例都是一个代理(Broker)。
角色类似于其他分布式系统Master的角色。
运行在broker上,但是一个集群同时只能存在一个 Controller,且与数据节点在一起。
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
消息发送过程:
目前kafka默认消息交付保证at least once,消息不丢但是回重复传递。
ack取值说明:ack配置越高,吞吐量越低。
ack取值 | 说明 |
---|---|
0 | 发送出去不接受确认信息 |
1 | 只要leader回复确认信息即可 |
all或者-1 | 代表kafka集群必须全部回复确认信息 |
unclean.leader.election.enable=true
允许选举OSR作为leaderAR (Assigned Replicas,分区中的副本)
ISR (In Sync Replicas):与leader保持一定程度同步的副本(包括leader副本在内)组成 。follow副本相对于leader副本而言会有一定程度的滞后,这个范围可以通过参数进行配置。
OSR (Out-of-Sync Replied):于leader副本同步滞后过多的副本(不包括leader副本)。
AR = ISR + OSR:正常情况下,所有的follower副本都应该与leader 副本保持 一定程度的同步,即AR=ISR,OSR集合为空。
ack=all
或者 ack=2、3
;重试次数retries>1;unclean.leader.election.enable=false数据发送到broker后是先保存在缓存中,后面再刷硬盘里。
在kafka中设置缓存刷到硬盘的时间间隔缩短,减少数据丢失概率。
比起丢数据,重复消费数据问题更小。客户端在接受kafka数据时注意建立幂等机制。
consumer采用pull(拉)模式从broker中读取数据。
kafka前期版本用来管理kafka集群、数据存储。
kafka2.8开始引入KRaft协议,逐渐减少对zk的依赖;4.0版本不再支持zk模式。
watch
/controller znode
节点,如果掉线旧重新注册。节点 | 功能 | 原理 |
---|---|---|
/brokers/ids | 注册broker,保存broker信息:物理地址、版本、启动时间 | 在zk中创建临时节点,broker会发心跳给zk |
/brokers/topics | 注册topic,保存topic信息。 每个topic节点下包含一个partitions节点; 每个partitions节点下由保存了一个state节点; state保存着当前leader分区和ISR的brokerID,同时维护ISR列表。 | zk中创建的临时节点 |
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] | 维护消费者和分区的注册关系,保存消费者组消费的是哪个partition | zk中临时节点。如果消费者有挂掉,zk进行剔除并重新分区。 |
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] | 分区消息的消费进度offset |
把集群中节点进行划分,有controller 和 broker 两种角色。
新版本数据的存储:
数据 | 存储位置 |
---|---|
offset | 存放在一个专门的topic中的partition里 |
使用kafka的消息量很大的情况下,容易出现rebalance。rebalance期间partition的读写阻塞,直至rebalance完成。降低rebalance次数,能提高性能。
消费者组下的消费者与topic下的partition重新匹配的过程。
目的:实现消费者的负载均衡和高可用性。
针对场景1和2,是由coordinator(协调者)进行rebalance的。
coordinator一般是leader节点所在的broker,通过心跳监测consumer是否超时。具体步骤:
针对场景3和4,由于topic和partition的变更broker无法感知,coordinator也就无法感知。由leader consumer监控topic变化,通知coordinator进行rebalance。
rebalance导致的重复消费问题:
consumer1宕掉之后,消费了一半的消息没有提交offset;rebalance之后进行了二次消费。此时consumer1恢复后进行了重复消费。
解决方案:
每次rebalance时标记Generation,rebalance成功后+1,consumer1再次提交offset时发现Generation值不同,则拒绝提交。
可以使用spring的kafka,依赖如下:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
原生用法如下。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>
spring:
kafka:
properties:
max.request.size: 10097152
zookeeper.connection.timeout.ms: 100000
consumer:
bootstrap-servers: 10.10.10.10:9092
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
max.request.size: 10097152
max.partition.fetch.bytes: 10097152
group-id: demo-id
@Configuration public class KafkaConfig { @Bean public Properties getProperties(){ Properties properties = new Properties(); properties.put("bootstrap.servers", "ip1:9092,ip2:9092,ip3:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("lingers.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return properties; } @Bean public KafkaProducer<String, String> kafkaProducer(@Autowired @Qualifier("getProperties") Properties kafkaProperties) { return new KafkaProducer<String, String>(kafkaProperties); } }
@Autowired
private Producer<String, String> producer;
producer.send(new ProducerRecord<String, String>("luowenhui", JSONObject.toJSONString(userinfo)));
注意:给kafka里送的消息,一般一条条的发送数据,这样可以轮询分发给不同的consumer,提高性能。对于有关联的数据可以选择批量发送。
@Configuration public class KafkaConfig { @Bean public Properties kafkaProperties() { Properties properties = new Properties(); properties.put("bootstrap.servers", "ip1:9092,ip2:9092,ip3:9092"); properties.put("group.id", "hello"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } @Bean public KafkaConsumer<String, String> kafkaConsumer0(@Autowired @Qualifier("kafkaProperties") Properties kafkaProperties) { KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(kafkaProperties); kafkaConsumer.assign(Arrays.asList(new TopicPartition("luo", 0)));//按区接收 //消费者订阅的topic, 可同时订阅多个 //consumer.subscribe(Arrays.asList("first", "second","third")); return kafkaConsumer; } }
assign方法和subscribe区别:
Kafka里的消费者组有两个使用的场景:
while (true) {
ConsumerRecords<String, String> records = consumer0.poll(100);
for (ConsumerRecord<String, String> record : records) {//record.value()就是获取到的数据
}
}
Ctrl+C停止命令;
注意:Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。若读者依旧使用的是 2.1 及以下版本,请将下述的 --bootstrap-server 参数及其值手动替换为 --zookeeper zk1:2181,zk2:2181,zk:2181。一定要注意两者参数值所指向的集群地址是不同的。
## 进入broker配置
[root@node1 kafka_2.11-1.1.1]# vi config/server.properties
## 启动kafka |
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties ## 关闭集群
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
说明 | 命令 | 备注 |
---|---|---|
创建topic | ./kafka-topics.sh --zookeeper node01:2181 --create --topic topic_name --partitions 30 --replication-factor 2 | –replication-factor 定义副本数(副本数不能大于kafka的集群节点数,否则会报错); --partitions 定义分区数 |
删除topic | ./kafka-topics.sh --zookeeper node1:2181 --delete --topic test ./kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper node01:2181 --topic t_cdr | 需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启 |
查看topic列表 | ./kafka-topics.sh --zookeeper hadoop102:2181 --list | _consumer_offsets的topic里面存储着offset,kafka默认分为50个分区,分布再不同的主机上。每存入一次数据,_consumer_offsets+1 |
查看Topic详情 | ./kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first | –Leader:Leader的主机号 broker-id; --Replicas:副本所在的主机号 0 1 2 代表三个主机; --Isr:选举使用,并且是按数据性差异排序的,与leader差异性最小,排名越靠前,刚开始的leader是随机选取的 |
kafka-console-consumer.sh 脚本(在kafka安装目录的bin下)是一个简易的消费者控制台。该 shell 脚本的功能通过调用 kafka.tools 包下的 ConsoleConsumer 类,并将提供的命令行参数全部传给该类实现。
## 发送消息:控制台向topic生产数据
bin/kafka-console-producer.sh --broker-list node86:9092 --topic t_cdr
## 消费消息:控制台消费topic的数据 添加--from-beginning参数表示从头到尾的数据,不添加表示最新的数据
bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic t_cdr --from-beginning
## 查看topic消费数据
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --property print.key=true --topic topicName
其他属性:
参数 | 值类型 | 说明 | 有效值 |
---|---|---|---|
–topic | string | 被消费的topic | |
–whitelist | string | 正则表达式,指定要包含以供使用的主题的白名单 | |
–partition | integer | 指定分区;除非指定’–offset’,否则从分区结束(latest)开始消费 | |
–offset | string | 执行消费的起始offset位置;默认值:latest | latest ;earliest ;<offset> |
–consumer-property | string | 将用户定义的属性以key=value的形式传递给使用者 | |
–consumer.config | string | 消费者配置属性文件;请注意,[consumer-property]优先于此配置 | |
–formatter | string | 用于格式化kafka消息以供显示的类的名称;默认值:kafka.tools.DefaultMessageFormatter | kafka.tools.DefaultMessageFormatter;kafka.tools.LoggingMessageFormatter;kafka.tools.NoOpMessageFormatter;kafka.tools.ChecksumMessageFormatter |
–property | string | 初始化消息格式化程序的属性 | `print.timestamp=true |
–from-beginning | 从存在的最早消息开始,而不是从最新消息开始 | ||
–max-messages | integer | 消费的最大数据量,若不指定,则持续消费下去 | |
–timeout-ms | integer | 在指定时间间隔内没有消息可用时退出 | |
–skip-message-on-error | 如果处理消息时出错,请跳过它而不是暂停 | ||
–bootstrap-server | string | 必需(除非使用旧版本的消费者),要连接的服务器 | |
–key-deserializer | string | ||
–value-deserializer | string | ||
–enable-systest-events | 除记录消费的消息外,还记录消费者的生命周期;(用于系统测试) | ||
–isolation-level | string | 设置为read_committed以过滤掉未提交的事务性消息;设置为read_uncommitted以读取所有消息;默认值:read_uncommitted | |
–group | string | 指定消费者所属组的ID | |
–blacklist | string | 要从消费中排除的主题黑名单 | |
–csv-reporter-enabled | 如果设置,将启用csv metrics报告器 | ||
–delete-consumer-offsets | 如果指定,则启动时删除zookeeper中的消费者信息 | ||
–metrics-dir | string | 输出csv度量值;需与[csv-reporter-enable]配合使用 | |
–zookeeper | string | 必需(仅当使用旧的使用者时)连接zookeeper的字符串。可以给出多个URL以允许故障转移 |
说明 | 命令 | 备注 |
---|---|---|
查询用户组 | ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list | |
查看组详情 | ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group MyGrp --describe | 可以查看消费挤压 |
修改偏移量 | ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group you_consumer_group_name --topic you_topic_name --execute --reset-offsets --to-offset 80 | --to-latest 修改到最近 |
返回数据说明:
字段 | 说明 |
---|---|
HOST | |
CLIENT-ID | |
CURRENT-OFFSET | |
LOG-END-OFFSET | |
LAG | 未消费数量,若与预期不符表示有积压。 |
CONSUMER-ID |
#broker的全局唯一编号,不能重复 broker.id=0 #删除topic功能使能 delete.topic.enable=true #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的现成数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka运行日志存放的路径 log.dirs=/opt/module/kafka/logs #topic在当前broker上的分区个数 num.partitions=1 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #配置连接Zookeeper集群地址 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:218
属性 | 默认值 | 描述 |
---|---|---|
broker.id | 必填参数,broker的唯一标识 | |
log.dirs | /tmp/kafka-logs | Kafka数据存放的目录。可以指定多个目录,中间用逗号分隔,当新partition被创建的时会被存放到当前存放partition最少的目录。 |
port | 9092 | BrokerServer接受客户端连接的端口号 |
zookeeper.connect | null | Zookeeper的连接串,格式为:hostname1:port1,hostname2:port2,hostname3:port3。可以填一个或多个,为了提高可靠性,建议都填上。注意,此配置允许我们指定一个zookeeper路径来存放此kafka集群的所有数据,为了与其他应用集群区分开,建议在此配置中指定本集群存放目录,格式为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消费者的参数要和此参数一致。 |
message.max.bytes | 1000000 | 服务器可以接收到的最大的消息大小。注意此参数要和consumer的maximum.message.size大小一致,否则会因为生产者生产的消息太大导致消费者无法消费。 |
num.io.threads | 8 | 服务器用来执行读写请求的IO线程数,此参数的数量至少要等于服务器上磁盘的数量。 |
queued.max.requests | 500 | I/O线程可以处理请求的队列大小,若实际请求数超过此大小,网络线程将停止接收新的请求。 |
socket.send.buffer.bytes | 100 * 1024 | The SO_SNDBUFF buffer the server prefers for socket connections. |
socket.receive.buffer.bytes | 100 * 1024 | The SO_RCVBUFF buffer the server prefers for socket connections. |
socket.request.max.bytes | 100 * 1024 * 1024 | 服务器允许请求的最大值, 用来防止内存溢出,其值应该小于 Java heap size. |
num.partitions | 1 | 默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值,建议改为5 |
log.segment.bytes | 1024 * 1024 * 1024 | Segment文件的大小,超过此值将会自动新建一个segment,此值可以被topic级别的参数覆盖。 |
log.roll.{ms,hours} | 24 * 7 hours | 新建segment文件的时间,此值可以被topic级别的参数覆盖。 |
log.retention.{ms,minutes,hours} | 7 days | Kafka segment log的保存周期,保存周期超过此时间日志就会被删除。此参数可以被topic级别参数覆盖。数据量大时,建议减小此值。 |
log.retention.bytes | -1 | 每个partition的最大容量,若数据量超过此值,partition数据将会被删除。注意这个参数控制的是每个partition而不是topic。此参数可以被log级别参数覆盖。 |
log.retention.check.interval.ms | 5 minutes | 删除策略的检查周期 |
auto.create.topics.enable | true | 自动创建topic参数,建议此值设置为false,严格控制topic管理,防止生产者错写topic。 |
default.replication.factor | 1 | 默认副本数量,建议改为2。 |
replica.lag.time.max.ms | 10000 | 在此窗口时间内没有收到follower的fetch请求,leader会将其从ISR(in-sync replicas)中移除。 |
replica.lag.max.messages | 4000 | 如果replica节点落后leader节点此值大小的消息数量,leader节点就会将其从ISR中移除。 |
replica.socket.timeout.ms | 30 * 1000 | replica向leader发送请求的超时时间。 |
replica.socket.receive.buffer.bytes | 64 * 1024 | The socket receive buffer for network requests to the leader for replicating data. |
replica.fetch.max.bytes | 1024 * 1024 | The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader. |
replica.fetch.wait.max.ms | 500 | The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader. |
num.replica.fetchers | 1 | Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker. |
fetch.purgatory.purge.interval.requests | 1000 | The purge interval (in number of requests) of the fetch request purgatory. |
zookeeper.session.timeout.ms | 6000 | ZooKeeper session 超时时间。如果在此时间内server没有向zookeeper发送心跳,zookeeper就会认为此节点已挂掉。 此值太低导致节点容易被标记死亡;若太高,.会导致太迟发现节点死亡。 |
zookeeper.connection.timeout.ms | 6000 | 客户端连接zookeeper的超时时间。 |
zookeeper.sync.time.ms | 2000 | H ZK follower落后 ZK leader的时间。 |
controlled.shutdown.enable | true | 允许broker shutdown。如果启用,broker在关闭自己之前会把它上面的所有leaders转移到其它brokers上,建议启用,增加集群稳定性。 |
auto.leader.rebalance.enable | true | If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available. |
leader.imbalance.per.broker.percentage | 10 | The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker. |
leader.imbalance.check.interval.seconds | 300 | The frequency with which to check for leader imbalance. |
offset.metadata.max.bytes | 4096 | The maximum amount of metadata to allow clients to save with their offsets. |
connections.max.idle.ms | 600000 | Idle connections timeout: the server socket processor threads close the connections that idle more than this. |
num.recovery.threads.per.data.dir | 1 | The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. |
unclean.leader.election.enable | true | Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss. |
delete.topic.enable | false | 启用deletetopic参数,建议设置为true。 |
offsets.topic.num.partitions | 50 | The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200). |
offsets.topic.retention.minutes | 1440 | Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic. |
offsets.retention.check.interval.ms | 600000 | The frequency at which the offset manager checks for stale offsets. |
offsets.topic.replication.factor | 3 | The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas. |
offsets.topic.segment.bytes | 104857600 | Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads. |
offsets.load.buffer.size | 5242880 | An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache. |
offsets.commit.required.acks | -1 | The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden. |
offsets.commit.timeout.ms | 5000 | The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout. |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。