Spring Cloud Stream是Spring Cloud体系内的一个框架,用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,其目的是简化消息业务在Spring Cloud应用中的开发。

Spring Cloud Stream的架构图如下所示,应用程序通过Spring Cloud Stream注入的输入通道inputs和输出通道outputs与消息中间件Middleware通信,消息通道通过特定的中间件绑定器Binder实现连接到外部代理。
Spring Cloud Stream的架构图
Spring Cloud Stream的实现基于发布/订阅机制,核心由四部分构成:Spring Framework中的Spring Messaging和Spring Integrataion,以及Spring Cloud Stream中的Binders和Bindings。

Spring Messaging:Spring Framework中的统一消息编程模型,其核心对象如下:

  • Message: 消息对象,包含消息头Header和消息体Payload。
  • MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送致消息通道。
  • MessageHandler:消息处理器接口,用于处理消息逻辑。

Spring Integration:Spring Framework中用于支持企业集成的一种扩展机制,作用是提供一个简单的模型来构建企业集成解决方案,对Spring Messaging进行了扩展。

  • MessageDispatcher: 消息分发接口,用于分发消息和添加删除消息处理器。
  • MessageRouter:消息路由接口,定义默认的输出消息通道。
  • Filter:消息的过滤注解,用于配置消息过滤表达式。
  • Aggregator:消息的聚合注解,用于将多条消息聚合成一条。
  • Splitter:消息的分割,用于将一条消息拆分成多条。


  • doBindProducer:绑定消息中间件客户端发送消息模块。
  • doBindConsumer:绑定消息中间件客户端接收消息模块。


Spring Cloud Stream官方提供了Kafka Binder和RabbitMQ Binder,用于集成Kafka和RabbitMQ,Spring Cloud Alibaba中加入了RocketMQ Binder,用于将RocketMQ集成到Spring Cloud Stream。

Spring Cloud Alibaba RocketMQ架构图

Spring Cloud Alibaba RocketMQ的架构图如下所示:

  • MessageChannel(output):消息通道,用于发送消息,Spring Cloud Stream的标准接口。
  • MessageChannel(input):消息通道,用于订阅消息,Spring Cloud Stream的标准接口。
  • Binder bindProducer:目标绑定器,将发送通道发过来的消息发送到RocketMQ消息服务器,由Spring Cloud Alibaba团队按照Spring Cloud Stream的标准协议实现。
  • Binder bindConsumer:目标绑定器,将接收到RocketMQ消息服务器的消息推送给订阅通道,由Spring Cloud Alibaba团队按照Spring Cloud Stream的标准协议实现。

Spring Cloud Stream消息发送流程

Spring Cloud Stream消息发送流程如下图所示,包括发送、订阅、分发、委派、消息处理等,具体实现如下:

public interface Source {
    String OUTPUT = "output";

    MessageChannel output();

public interface MessageChannel {

    default boolean send(Message<?> message) {
        return this.send(message, -1L);

    boolean send(Message<?> var1, long var2);
public abstract class AbstractMessageChannel extends IntegrationObjectSupport implements MessageChannel, TrackableComponent, InterceptableChannel, MessageChannelMetrics, ConfigurableMetricsAware<AbstractMessageChannelMetrics>, IntegrationPattern {
	public boolean send(Message<?> messageArg, long timeout) {
	    // 省略部分代码
        sent = this.doSend(message, timeout);
        // 省略部分代码
        return sent;
	protected abstract boolean doSend(Message<?> var1, long var2);
protected boolean doSend(Message<?> message, long timeout) {
        try {
            return this.getRequiredDispatcher().dispatch(message);
        } catch (MessageDispatchingException var6) {
            String description = var6.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
            throw new MessageDeliveryException(message, description, var6);
public class DirectChannel extends AbstractSubscribableChannel {

	protected UnicastingDispatcher getDispatcher() {
        return this.dispatcher;
public class UnicastingDispatcher extends AbstractDispatcher {
	public final boolean dispatch(Message<?> message) {
        if (this.executor != null) {
            Runnable task = this.createMessageHandlingTask(message);
            return true;
        } else {
            return this.doDispatch(message);

	private boolean doDispatch(Message<?> message) {
        if (this.tryOptimizedDispatch(message)) {
            return true;
        } else {
            boolean success = false;
            Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
            if (!handlerIterator.hasNext()) {
                throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
            } else {
                ArrayList exceptions = null;

                while(!success && handlerIterator.hasNext()) {
                    MessageHandler handler = (MessageHandler)handlerIterator.next();

                    try {
                        success = true;
                    } catch (Exception var9) {
                        RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, () -> {
                            return "Dispatcher failed to deliver Message";
                        }, var9);
                        if (exceptions == null) {
                            exceptions = new ArrayList();

                        boolean isLast = !handlerIterator.hasNext();
                        if (!isLast && this.failover) {
                            this.logExceptionBeforeFailOver(var9, handler, message);

                        this.handleExceptions(exceptions, message, isLast);

                return success;
private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
        Set<MessageHandler> handlers = this.getHandlers();
        return this.loadBalancingStrategy != null ? this.loadBalancingStrategy.getHandlerIterator(message, handlers) : handlers.iterator();
public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel, SubscribableChannelManagement {

	public boolean subscribe(MessageHandler handler) {
        MessageDispatcher dispatcher = this.getRequiredDispatcher();
        boolean added = dispatcher.addHandler(handler);
        this.adjustCounterIfNecessary(dispatcher, added ? 1 : 0);
        return added;
public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>> extends AbstractBinder<MessageChannel, C, P> implements PollableConsumerBinder<MessageHandler, C>, ApplicationEventPublisherAware {

	public final Binding<MessageChannel> doBindProducer(final String destination, MessageChannel outputChannel, final P producerProperties) throws BinderException {
        // 创建Producer的messageHandler
        final MessageHandler producerMessageHandler;
        final ProducerDestination producerDestination;
        try {
            // 省略部分代码
            producerMessageHandler = this.createProducerMessageHandler(producerDestination, producerProperties, outputChannel, errorChannel);
            // 省略部分代码
            // 创建SendingHandler并调用subscribe
        ((SubscribableChannel)outputChannel).subscribe(new AbstractMessageChannelBinder.SendingHandler(producerMessageHandler, HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()), this.headersToEmbed, this.useNativeEncoding(producerProperties)));
        // 省略部分代码

Producer的MessageHandler是由消息中间件Binder来完成的,Spring Cloud Stream提供了创建MessageHandler的规范。

AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring 容器加载所有Bean。

