赞
踩
更新了ssl证书脚本,添加了python版本的证书导出
#!/bin/bash
#################################
BASE_DIR=/data/kafka_2.11_auth
CERT_OUTPUT_PATH="$BASE_DIR/server_cert"
CLIENT_CERT_OUTPUT_PATH="$BASE_DIR/client_cert"
PASSWORD=graph123
KEY_STORE="$CERT_OUTPUT_PATH/kafka.keystore"
CLIENT_KEY_STORE="$CLIENT_CERT_OUTPUT_PATH/client.kafka.keystore"
TRUST_STORE="$CERT_OUTPUT_PATH/kafka.truststore"
CLIENT_TRUST_STORE="$CLIENT_CERT_OUTPUT_PATH/client.kafka.truststore"
CLUSTER_NAME=graphca
CERT_AUTH_FILE="$CERT_OUTPUT_PATH/ca-cert"
CLUSTER_CERT_FILE="$CERT_OUTPUT_PATH/${CLUSTER_NAME}-cert"
CLIENT_CLUSTER_CERT_FILE="$CERT_OUTPUT_PATH/${CLUSTER_NAME}-cert-client"
DAYS_VALID=999
DNAME="CN=graph,OU=graph, O=graph, L=graph, ST=graph, C=CN"
#################################
mkdir -p $CERT_OUTPUT_PATH
mkdir -p $CLIENT_CERT_OUTPUT_PATH
echo "1:创建秘钥和证书"
keytool -keystore $KEY_STORE -alias $CLUSTER_NAME -validity $DAYS_VALID -genkey -keyalg RSA \
-storepass $PASSWORD -keypass $PASSWORD -dname "$DNAME"
keytool -keystore $CLIENT_KEY_STORE -alias $CLUSTER_NAME -validity $DAYS_VALID -genkey -keyalg RSA \
-storepass $PASSWORD -keypass $PASSWORD -dname "$DNAME"
echo "2:创建CA证书信任库"
openssl req -new -x509 -keyout ${CERT_OUTPUT_PATH}/ca-key -out "$CERT_AUTH_FILE" -days "$DAYS_VALID" \
-passin pass:"$PASSWORD" -passout pass:"$PASSWORD" \
-subj "/C=CN/ST=graph/L=graph/O=graph/CN=CN"
keytool -keystore "$TRUST_STORE" -alias CARoot \
-import -file "$CERT_AUTH_FILE" -storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt
keytool -keystore "$CLIENT_TRUST_STORE" -alias CARoot \
-import -file "$CERT_AUTH_FILE" -storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt
echo "3:CA证书签名"
keytool -keystore "$KEY_STORE" -alias "$CLUSTER_NAME" -certreq -file "$CLUSTER_CERT_FILE" \
-storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt
keytool -keystore "$CLIENT_KEY_STORE" -alias "$CLUSTER_NAME" -certreq -file "$CLIENT_CLUSTER_CERT_FILE" \
-storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt
penssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CLUSTER_CERT_FILE" \
-out "${CLUSTER_CERT_FILE}-signed" \
-days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"
penssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CLIENT_CLUSTER_CERT_FILE" \
-out "${CLIENT_CLUSTER_CERT_FILE}-signed" \
-days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"
echo "4:创建集群证书到keystore"
keytool -keystore "$TRUST_STORE" -alias "${CLUSTER_NAME}" -import -file "${CLUSTER_CERT_FILE}-signed" \
-storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt
keytool -keystore "$CLIENT_TRUST_STORE" -alias "${CLUSTER_NAME}" -import -file "${CLIENT_CLUSTER_CERT_FILE}-signed" \
-storepass "$PASSWORD" -keypass "$PASSWORD" -noprompt
kafka server.properties
broker.id=100
host.name=dggts10012035
advertised.host.name=dggts10012035
delete.topic.enable=true
auto.create.topics.enable=true
num.network.thread=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.revice.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data01/kafka_2.11_auth/data
kafka.logs.dir=logs
num.partitions=3
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replcation.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741024
log.retention.check.interval.ms=300000
zookeeper.connect=dggts10012041:2181/kafka_ssl_test3
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0
#ssl认证部分
listeners=PLAINTEST://:9092,SASL_SSL://:9094
#如果不用ssl 下面需要修改
advertised.listeners=PLAINTEXT://dggta10012035:9092,SASL_SSL://dggta10012035:9094
#advertised.listeners=PLAINTEXT://dggta10012035:9092,SASL_PLAINTEXT://dggta10012035:9094
下一行改为以下6行全部注释掉
ssl.keystore.location=/data01/kafka_2.11_auth/server_cert/kafka.keystore
ssl.keystore.password=graph123
ssl.truststore.location=/data01/kafka_2.11_auth/server_cert/kafka.truststore
ssl.truststore.password=graph123
ssl.key.password=graph123
ssl.client.auth=required
#ACL入口部分
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimleAclAuthorizer
#SASL部分
#如果不使用SSL下面要改
security.inter.broker.protocal=SASL_PLAINTEXT
#security.inter.broker.protocal=SASL_SSL
sasl.machanism.inter.broker.protocol=PLAIN
SASL.enable.machanisms=PLAIN
#super user
super.user=User:admin
python证书导出
keytool -list -rfc -keystore client.kafka.keystore
keytool -export -alias graphca -keystore client.kafka.keystore -rfc -file certificate.pem
keytool -v -importkeystpre client.kafka.keystore -srcalias graphca -destkeystore cert_and_key.p12 -deststoretype PKCS12
openssl pkcs12 -in cert_and_key.p12 -nocerts -nodes
把输出---BEGIN PRIVATE KEY---到---END PRIVATE KEY---的拷贝到key.pem中
keytool -exportcert -alias graphca -keystore client.kafka.keystore -rfc -file CARoot.pem
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_reader="reader"
user_writer="writer";
}
client 端
KafkaCLient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="reader"
password="reader;
}
#acl 命令
#添加组权限
kafka-acls.sh \
--authorizer kafka.security.auth.SimplAclAuthorizer \
--authorizer-properties zookeeper.connect=dggta10012041:2181/kafka_sl_test3 \
--add --allow-principal User:reader --operation Read --group test-group
#添加用户权限
kafka-acls.sh \
--authorizer kafka.security.auth.SimplAclAuthorizer \
--authorizer-properties zookeeper.connect=dggta10012041:2181/kafka_sl_test3 \
--add --allow-principal User:reader --operation Read --topic test
python-kafka代码
因为SASL_SSL没有测试成功,导出的pkcs12格式的证书失败,python的demo只使用了SASL_PLAINTEXT的权限控制
from kafka import KafkaProducer, KafkaConsumer
print "step1"
consumer = KafkaConsumer("test1",
group_id = 'test-group',
bootstrap_server='dggts10012036:9094',
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username='admin',
sasl_plain_password='admin'
)
print "step2"
producer=KafkaProducer(bootstrap_server='dggts10012036:9094',
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username='admin',
sasl_plain_password='admin'
)
print "step3"
producer.send("test1", bytes("Hello World"))
producer.flush()
print "step4"
for msg in consumer:
print msg.value
java demo
java版本使用的是kafka-client 1.0.0
pom 文件如下:
org.apache.kafka
kafka-clients
1.0.0
producer
package com.study.kafka;
import java.util.Properties;
import com.sun.org.apache.xerces.internal.util.SynchronizedSymbolTable;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class sendMessageKafka {
public static void main(String[] args) {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "dggts10012036:9094");
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
System.setProperties("java.security.auth.login.config", "d:\\kafka_2.11_auth\\client_cert\\kafka_client_jaas_admin.conf");
/*
auth section
注销的第一行是使用的SASL_SSL
剩下的注释是配合SSL使用
*/
// producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG"SASL_SSL");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG"SASL_PLAINTEXT");
// producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "path of client.kafka.keystore");
// producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "graph123");
// producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "path of client.kafka.keystore");
// producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "graph123");
// producerProps.put(SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, "JKS");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
KafkaProducer producer = new KafkaProducer(producerProps);
try {
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("test1", Integer.toString(i), Integer.toString(i)));
System.out.println("Finish");
}
} catch (Exception e) {
e.printStackTrace();
}finally {
producer.close();
}
}
}
consumer
package com.study.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.protocol.types.Field;
import java.util.Arrays;
import java.util.Properties;
public class getMessageKafka {
public static void main(String[] args) {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "dggts10012035:9094");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
System.setProperties("java.security.auth.login.config", "d:\\kafka_2.11_auth\\client_cert\\kafka_client_jaas_admin.conf");
/*
auth section
注销的第一行是使用的SASL_SSL
剩下的注释是配合SSL使用
*/
// consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG"SASL_SSL");
consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG"SASL_PLAINTEXT");
// consumerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "path of client.kafka.keystore");
// consumerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "graph123");
// consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "path of client.kafka.keystore");
// consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "graph123");
// consumerProps.put(SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, "JKS");
consumerProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList("test1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record: records) {
System.out.printf("offset = %d, key= %s , value = %s\n", record.offset(), record.key(), record.value());
}
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。