赞
踩
结合前两期 Dapr(一) 基于云原生了解Dapr(Dapr(一) 基于云原生了解Dapr-CSDN博客) Dapr(二) 分布式应用运行时搭建及服务调用(Dapr(二) 分布式应用运行时搭建及服务调用-CSDN博客)
下篇推出dapr服务注册与发现,dapr组件绑定,dapr Actor功能。
目录
Dapr的状态管理允许应用程序保存和检索键值对数据,具有可插拔的存储、配置的行为和额外的安全特性。以下是主要特点:
可插拔状态存储:Dapr支持多种数据存储,比如MySQL、Redis、Azure CosmosDB等,可以在不修改代码的情况下切换。
配置存储行为:你可以指定并发控制和一致性级别。默认是最终一致性,但也支持强一致性。
并发控制:通过ETags实现乐观并发控制(OCC)。写操作需要匹配当前的ETag值,防止冲突。
自动加密:预览功能,支持应用程序状态的自动加密和密钥轮换。
一致性选项:可以选择强一致性的写入,等待所有副本确认,或者默认的最终一致性。
批量操作:支持一次性处理多条状态记录。
Dapr默认使用的Redis进行存储。
- apiVersion: dapr.io/v1alpha1
- kind: Component
- metadata:
- name: statestore
- spec:
- type: state.redis
- version: v1
- metadata:
- - name: redisHost
- value: localhost:6379
- - name: redisPassword
- value: ""
- - name: actorStateStore
- value: "true"
- public class StateController : ControllerBase
- {
- private readonly ILogger<StateController> _logger;
- private readonly DaprClient _daprClient;
- public StateController(ILogger<StateController> logger, DaprClient daprClient)
- {
- _logger = logger;
- _daprClient = daprClient;
- }
-
- // 获取一个值
- [HttpGet]
- public async Task<ActionResult> GetAsync()
- {
- var result = await _daprClient.GetStateAsync<string>("statestore", "guid");
- return Ok(result);
- }
-
- //保存一个值
- [HttpPost]
- public async Task<ActionResult> PostAsync()
- {
- await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong });
- return Ok("done");
- }
-
- //删除一个值
- [HttpDelete]
- public async Task<ActionResult> DeleteAsync()
- {
- await _daprClient.DeleteStateAsync("statestore", "guid");
- return Ok("done");
- }
-
- //通过tag防止并发冲突,保存一个值
- [HttpPost("withtag")]
- public async Task<ActionResult> PostWithTagAsync()
- {
- var (_, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
- await _daprClient.TrySaveStateAsync("statestore", "guid", Guid.NewGuid().ToString(), etag);
- return Ok("done");
- }
-
- //通过tag防止并发冲突,删除一个值
- [HttpDelete("withtag")]
- public async Task<ActionResult> DeleteWithTagAsync()
- {
- var (_, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
- return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag));
- }
-
-
- // 从绑定获取一个值,健值name从路由模板获取
- [HttpGet("frombinding/{name}")]
- public ActionResult GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
- {
- return Ok(state.Value);
- }
-
-
- // 根据绑定获取并修改值,健值name从路由模板获取
- [HttpPost("withbinding/{name}")]
- public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
- {
- state.Value = Guid.NewGuid().ToString();
- return Ok(await state.TrySaveAsync());
- }
-
-
- // 获取多个个值
- [HttpGet("list")]
- public async Task<ActionResult> GetListAsync()
- {
- var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
- return Ok(result);
- }
-
- // 删除多个个值
- [HttpDelete("list")]
- public async Task<ActionResult> DeleteListAsync()
- {
- var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
- var removeList = new List<BulkDeleteStateItem>();
- foreach (var item in data)
- {
- removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag));
- }
- await _daprClient.DeleteBulkStateAsync("statestore", removeList);
- return Ok("done");
- }
- }
- apiVersion: dapr.io/v1alpha1
- kind: Component
- metadata:
- name: statestore
- spec:
- type: state.mysql
- version: v1
- metadata:
- - name: connectionString
- value: "root:123456@tcp(192.168.157.157:3306)/?allowNativePasswords=true"
切换状态为MySql进行存储。
应用程序与 Dapr sidecar 交互,以存储和检索键/值数据。 在底层,sidecar API 使用**可配置的状态存储组件**来保存数据。 开发人员可以从不断增长的受支持状态存储集合中选择,其中包括 Azure Cosmos DB、SQL Server 和 Cassandra。
发布订阅(Publish-Subscribe)是一种通信模式,允许发布者发送消息到一个中心节点(通常是消息代理或主题),而不关心具体哪些订阅者会接收到这些消息。订阅者则注册他们感兴趣的特定类型的消息,当匹配的消息发布时,他们会收到通知。这种模式的特点在于解耦了发布者和订阅者,提高了系统的灵活性和可扩展性。
关键元素包括:
一个简单的示例是新闻系统,其中发布者发布新闻到特定类别,而订阅者选择关注他们感兴趣的类别。发布者不直接通知订阅者,而是通过消息代理进行,这样订阅者仅接收与其订阅相匹配的新闻。
发布订阅模式的应用场景通常涉及异步通信、事件驱动的系统或需要解耦组件的场景。
Dapr默认使用的Redis进行发布订阅。
- apiVersion: dapr.io/v1alpha1
- kind: Component
- metadata:
- name: pubsub
- spec:
- type: pubsub.redis
- version: v1
- metadata:
- - name: redisHost
- value: localhost:6379
- - name: redisPassword
- value: ""
- [ApiController]
- [Route("[controller]")]
- public class PubsubController : ControllerBase
- {
- private DaprClient _daprClient;
- private ILogger<PubsubController> _logger;
-
- public PubsubController(DaprClient daprClient, ILogger<PubsubController> logger)
- {
- _daprClient = daprClient;
- _logger = logger;
- }
-
- /// <summary>
- /// 发布消息的方法
- /// </summary>
- /// <returns></returns>
- [HttpPost]
- [Route("pub")]
- public async Task<IActionResult> PublishMessage()
- {
- _logger.LogInformation("***发布消息***");
- var data = new UserInfo(10001,"操作员",19);
- await _daprClient.PublishEventAsync("pubsub", "topic",data);
-
- return Ok("***发布消息成功***");
- }
- }
- [ApiController]
- [Route("[controller]")]
- public class SubController : ControllerBase
- {
- private ILogger<SubController> _logger;
-
- public SubController(ILogger<SubController> logger)
- {
- _logger = logger;
- }
-
-
- [HttpPost("sub")]
- [Topic("pubsub", "topic")]
- public IActionResult ConsumerMessage(UserInfo user)
- {
- _logger.LogInformation("***消费消息***");
- Console.WriteLine($"userId:{user.UserId} userName:{user.UserName}");
- return Ok();
- }
- }
Program.cs
- app.UseCloudEvents();
- app.MapSubscribeHandler();
切换为RabbitMQ
- apiVersion: dapr.io/v1alpha1
- kind: Component
- metadata:
- name: pubsub
- namespace: default
- spec:
- type: pubsub.rabbitmq
- version: v1
- metadata:
- - name: host
- value: "amqp://123:123@192.168.157.157:5672"
- - name: durable
- value: "false"
- - name: deletedWhenUnused
- value: "false"
- - name: autoAck
- value: "false"
- - name: deliveryMode
- value: "0"
- - name: requeueInFailure
- value: "false"
- - name: prefetchCount
- value: "0"
- - name: reconnectWait
- value: "0"
- - name: concurrencyMode
- value: parallel
- - name: backOffPolicy
- value: "exponential"
- - name: backOffInitialInterval
- value: "100"
- - name: backOffMaxRetries
- value: "16"
Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。服务将消息发布到指定主题, 业务服务订阅主题以使用消息。服务在 Dapr sidecar 上调用 pub/sub API。 然后,sidecar 调用预定义 Dapr pub/sub 组件。
Dapr的发布订阅功能使得在分布式系统中实现发布/订阅消息模式变得更加简单。主要解决了不同消息产品之间实施复杂性和功能差异的问题。你可以通过Dapr的Sidecar API使用HTTP或gRPC来发布和订阅消息。以下是关键操作的概述:
发布(Publish)消息:
http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>
URL,其中 <dapr-port>
是Dapr Sidecar监听的端口,<pub-sub-name>
是选择的发布/订阅组件名,而 <topic>
是消息的目标主题。订阅(Subscribe)消息:
http://localhost:<appPort>/dapr/subscribe
指定其订阅,其中 <appPort>
是应用程序监听的端口。Dapr的状态管理提供了一种跨服务持久化数据的方法,支持多种存储后端。关键特性包括:
原子性操作:支持原子性的读写操作,保证一致性。
版本控制:允许跟踪状态更改的历史版本,便于回滚。
事件驱动:状态变化可触发回调函数,实现基于状态变化的自动化操作。
过期策略:可设置状态项的过期时间。
备份与恢复:提供状态备份和恢复机制,确保高可用性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。