赞
踩
Kafka:是一个消息队列,
流平台有三个关键功能:
发布和订阅记录流,类似于消息队列或企业消息传递系统。
以容错、持久的方式存储记录流。
当记录发生时,处理记录流。
Kafka通常用于两大类应用:
构建可靠地在系统或应用程序之间获取数据的实时流数据管道。
构建转换数据流或对数据流作出反应的实时流应用程序。
概念:
Kafka作为集群运行在一个或多个服务器上,可以跨越多个数据中心。
Kafka集群存储的流记录在类别中称为topic.
每个记录由一个键、一个值和一个时间戳组成。
Kafka有五个核心API:
生产者API允许应用程序将记录流发布到一个或多个Kafka主题。
消费者API允许应用程序订阅一个或多个主题并处理向其生成的记录流。
流API允许应用程序充当流处理器,从一个或多个主题消耗输入流,并产生输出流到一个或多个输出主题,从而有效地将输入流转换为输出流。
连接器API允许构建和运行可重用的生产者或使用者,将Kafka主题连接到现有的应用程序或数据系统。例如,连接到关系数据库的连接器可能捕获对表的每一项更改。
adminApi:管理员的API;
火车排队买票,有以下几种情况
为什么要排队;那是因为卖票窗口的速度太慢,同时买的票人又多;(去买票的时候不用排队)
买票的人排队有一个最长的时间限制;(最多等一个小时,不买了)
在一个小时以内,我接受买票成功
永远等
主题是将记录发布到的类别或提要名称。Kafka中的主题总是多订阅者;也就是说,一个topic可以有零、一个或多个订阅写入它的数据的使用者。(kafka有多个生产者和消费者)
每个分区都有偏移量(offset)
kafka里面的记录默认保留两天,超过两天,就会删除,(可以配置)
日志中的分区有几种用途。
它们允许日志扩展到适合于单个服务器的大小之外。每个单独的分区必须适合承载它的服务器,但是一个主题可能有许多分区,因此它可以处理任意数量的数据。
它们充当并行性的单位-更详细地介绍一下
每个分区有一个服务器充当“领导者”,零个或多个服务器充当“追随者”。领导者处理分区的所有读写请求,而跟随者则被动地复制领导。如果领导失败,其中一个追随者将自动成为新的领导。每个服务器都充当一些分区的领导者和其他分区的跟随者,因此负载在集群中是很好的平衡。
因为kafka自带zookeeper,所以不用启动
随机选择一台服务器把kafka传上去:(以node2为例)
解压
tar -xzf kafka_2.12-2.5.0.tgz
重命名
mv kafka_2.12-2.5.0 kafka
删除tar包
rm -rf kafka_2.12-2.5.0.tgz
启动:
bin/zookeeper-server-start.sh config/zookeeper.properties
后台启动:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
创建一个主题:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafka-hw
查看主题列表:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
描述主题
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic kafka-hw
创建一个生产者:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic kafka-hw
创建一个消费者:
1.默认只有我启动的之后的数据才可以读到
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-hw
2.从头开始读
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-hw --from-beginning
先启动本地zookeeper
在四台机器上分别都装上kafka(把node2上的kafka复制到三台电脑上)
然后在四台服务器上要修改配置文件:(config/server.properties)
每一台kafa的broderid都不能一样node1–node4对应1–4
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
日志的目录千万不要写到tmp下面;
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/data/kafka-logs
zookeepr;要配置上自己的zookeeper集群
zookeeper.connect=node1:2181,node2:2181,node3:2181
在四台电脑上同时启动4个kafka
nohup bin/kafka-server-start.sh config/server.properties &
查看日志:
more nohup.out
创建一个主题
bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 1 --partitions 1 --topic test
查看主题
bin/kafka-topics.sh --list --zookeeper node-1:2181,node-2:2181,node-3:2181
bin/kafka-topics.sh --list --bootstrap-server node-1:9092,node-2:9092,node-3:9092,node-4:9092
发送一些消息(生产者)
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092,node4:9092 --topic test-clus
在启动一个消费者
bin/kafka-console-consumer.sh --bootstrap-server node7-1:9092,node7-2:9092,node7-3:9092,node7-4:9092 --topic test
从头开始读
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092,node4:9092 --topic test-clus --from-beginning
描述主题:
bin/kafka-topics.sh --describe --zookeeper node1:2181,node2:2181,node3:2181 --topic test-clus
删除一个topic
bin/kafka-topics.sh --delete --zookeeper node7-1:2181,node7-2:2181,node7-3:2181
创建主题—多分区多副本
bin/kafka-topics.sh --create --zookeeper node-1:2181,node-2:2181,node-3:2181 --replication-factor 3 --partitions 2 --topic multi-test
描述主题
bin/kafka-topics.sh --describe --zookeeper node-1:2181,node-2:2181,node-3:2181 --topic multi-test
创建的主题一定得是多个分区,多个副本数
描述主题:
bin/kafka-topics.sh --describe --zookeeper node-1:2181,node-2:2181,node-3:2181 --topic multi-test
启动生产者
bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,node-3:9092,node-4:9092 --topic multi-test
启动消费者
bin/kafka-console-consumer.sh --bootstrap-server node-1:9092,node-2:9092,node-3:9092,node-4:9092 --topic multi-test --from-beginning
杀死领导者,leader会变成另外一个,不影响我们的使用
输出一个字符串到文件中;>(覆盖)>>(追加)
echo -e 'aaaaaaaaaaaaaa\nbbbbbbbbbbbbbb' > ~/kafka.txt
配置文件:(config/connect-file-source.properties)
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test
配置文件:(config/connect-file-sink.properties)
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test
配置文件:(connect-standalone.properties)
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Note: symlinks will be followed to discover dependencies or plugins. # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, #plugin.path=
启动命令
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
注意:由于配置文件中的文件名和路径已经写死了,要把文件改修改为一样的
目的:数据都是来自生产者(sh文件);变成了一个文本文件;kafka中的数据导出为一个文件;(导入和导出)
创建主题:
bin/kafka-topics.sh --create \
--bootstrap-server node-1:9092,node-2:9092,node-3:9092,node-4:9092 \
--replication-factor 3 \
--partitions 2 \
--topic streams-plaintext-input
查看主题
bin/kafka-topics.sh --list --zookeeper node-1:2181,node-2:2181,node-3:2181
描述主题
bin/kafka-topics.sh --describe --zookeeper node-1:2181,node-2:2181,node-3:2181 --topic streams-plaintext-input
启动示例程序
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
启动一个生产者
bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,node-3:9092,node-4:9092 --topic streams-plaintext-input
启动消费者
bin/kafka-console-consumer.sh --bootstrap-server node-1:9092,node-2:9092,node-3:9092,node-4:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
示例代码:
package org.apache.kafka.streams.examples.wordcount; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; public final class WordCountDemo { public static void main(final String[] args) { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node-1:9092,node-2:9092,node-3:9092,node-4:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // Note: To re-run the demo, you need to use the offset reset tool: // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> source = builder.stream("streams-plaintext-input"); final KTable<String, Long> counts = source .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) .groupBy((key, value) -> value) .count(); // need to override value serde to Long type counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (final Throwable e) { System.exit(1); } System.exit(0); } }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。