当前位置:   article > 正文

Dapr(三) Dapr核心组件的使用一

Dapr(三) Dapr核心组件的使用一

结合前两期 Dapr(一) 基于云原生了解Dapr(Dapr(一) 基于云原生了解Dapr-CSDN博客) Dapr(二) 分布式应用运行时搭建及服务调用(Dapr(二) 分布式应用运行时搭建及服务调用-CSDN博客)

下篇推出dapr服务注册与发现,dapr组件绑定,dapr Actor功能。

目录

1.0 Dapr状态管理

1.1 Dapr状态组件配置文件

1.2 状态控制器

1.3 切换其它状态存储

1.4 工作原理

2.0 发布订阅

2.1 什么是发布订阅

2.2 设置发布订阅组件

2.3 控制器代码

2.3.1 发布控制器

2.3.2 订阅控制器

2.4 修改文件Program.cs

2.5 切换组件 

2.6 工作原理

总结:


1.0 Dapr状态管理

Dapr的状态管理允许应用程序保存和检索键值对数据,具有可插拔的存储、配置的行为和额外的安全特性。以下是主要特点:

  1. 可插拔状态存储:Dapr支持多种数据存储,比如MySQL、Redis、Azure CosmosDB等,可以在不修改代码的情况下切换。

  2. 配置存储行为:你可以指定并发控制和一致性级别。默认是最终一致性,但也支持强一致性。

  3. 并发控制:通过ETags实现乐观并发控制(OCC)。写操作需要匹配当前的ETag值,防止冲突。

  4. 自动加密:预览功能,支持应用程序状态的自动加密和密钥轮换。

  5. 一致性选项:可以选择强一致性的写入,等待所有副本确认,或者默认的最终一致性。

  6. 批量操作:支持一次性处理多条状态记录。

1.1 Dapr状态组件配置文件

Dapr默认使用的Redis进行存储。

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: statestore
  5. spec:
  6. type: state.redis
  7. version: v1
  8. metadata:
  9. - name: redisHost
  10. value: localhost:6379
  11. - name: redisPassword
  12. value: ""
  13. - name: actorStateStore
  14. value: "true"

1.2 状态控制器

  1. public class StateController : ControllerBase
  2. {
  3. private readonly ILogger<StateController> _logger;
  4. private readonly DaprClient _daprClient;
  5. public StateController(ILogger<StateController> logger, DaprClient daprClient)
  6. {
  7. _logger = logger;
  8. _daprClient = daprClient;
  9. }
  10. // 获取一个值
  11. [HttpGet]
  12. public async Task<ActionResult> GetAsync()
  13. {
  14. var result = await _daprClient.GetStateAsync<string>("statestore", "guid");
  15. return Ok(result);
  16. }
  17. //保存一个值
  18. [HttpPost]
  19. public async Task<ActionResult> PostAsync()
  20. {
  21. await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong });
  22. return Ok("done");
  23. }
  24. //删除一个值
  25. [HttpDelete]
  26. public async Task<ActionResult> DeleteAsync()
  27. {
  28. await _daprClient.DeleteStateAsync("statestore", "guid");
  29. return Ok("done");
  30. }
  31. //通过tag防止并发冲突,保存一个值
  32. [HttpPost("withtag")]
  33. public async Task<ActionResult> PostWithTagAsync()
  34. {
  35. var (_, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
  36. await _daprClient.TrySaveStateAsync("statestore", "guid", Guid.NewGuid().ToString(), etag);
  37. return Ok("done");
  38. }
  39. //通过tag防止并发冲突,删除一个值
  40. [HttpDelete("withtag")]
  41. public async Task<ActionResult> DeleteWithTagAsync()
  42. {
  43. var (_, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
  44. return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag));
  45. }
  46. // 从绑定获取一个值,健值name从路由模板获取
  47. [HttpGet("frombinding/{name}")]
  48. public ActionResult GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
  49. {
  50. return Ok(state.Value);
  51. }
  52. // 根据绑定获取并修改值,健值name从路由模板获取
  53. [HttpPost("withbinding/{name}")]
  54. public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
  55. {
  56. state.Value = Guid.NewGuid().ToString();
  57. return Ok(await state.TrySaveAsync());
  58. }
  59. // 获取多个个值
  60. [HttpGet("list")]
  61. public async Task<ActionResult> GetListAsync()
  62. {
  63. var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
  64. return Ok(result);
  65. }
  66. // 删除多个个值
  67. [HttpDelete("list")]
  68. public async Task<ActionResult> DeleteListAsync()
  69. {
  70. var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
  71. var removeList = new List<BulkDeleteStateItem>();
  72. foreach (var item in data)
  73. {
  74. removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag));
  75. }
  76. await _daprClient.DeleteBulkStateAsync("statestore", removeList);
  77. return Ok("done");
  78. }
  79. }

1.3 切换其它状态存储

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: statestore
  5. spec:
  6. type: state.mysql
  7. version: v1
  8. metadata:
  9. - name: connectionString
  10. value: "root:123456@tcp(192.168.157.157:3306)/?allowNativePasswords=true"

切换状态为MySql进行存储。

1.4 工作原理

应用程序与 Dapr sidecar 交互,以存储和检索键/值数据。 在底层,sidecar API 使用**可配置的状态存储组件**来保存数据。 开发人员可以从不断增长的受支持状态存储集合中选择,其中包括 Azure Cosmos DB、SQL Server 和 Cassandra。

2.0 发布订阅

2.1 什么是发布订阅

发布订阅(Publish-Subscribe)是一种通信模式,允许发布者发送消息到一个中心节点(通常是消息代理或主题),而不关心具体哪些订阅者会接收到这些消息。订阅者则注册他们感兴趣的特定类型的消息,当匹配的消息发布时,他们会收到通知。这种模式的特点在于解耦了发布者和订阅者,提高了系统的灵活性和可扩展性。

关键元素包括:

  1. 发布者 (Publisher): 生产消息的实体,它向主题或消息代理发送消息,无需了解谁会接收这些消息。
  2. 订阅者 (Subscriber): 对特定消息感兴趣并希望接收通知的实体,它们通过订阅主题或消息代理来表达兴趣。
  3. 主题 或 消息代理 (Topic or Message Broker): 中间媒介,接收并分发消息,确保消息从发布者到达正确的订阅者。

一个简单的示例是新闻系统,其中发布者发布新闻到特定类别,而订阅者选择关注他们感兴趣的类别。发布者不直接通知订阅者,而是通过消息代理进行,这样订阅者仅接收与其订阅相匹配的新闻。

发布订阅模式的应用场景通常涉及异步通信、事件驱动的系统或需要解耦组件的场景。

2.2 设置发布订阅组件

Dapr默认使用的Redis进行发布订阅

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: pubsub
  5. spec:
  6. type: pubsub.redis
  7. version: v1
  8. metadata:
  9. - name: redisHost
  10. value: localhost:6379
  11. - name: redisPassword
  12. value: ""

2.3 控制器代码

2.3.1 发布控制器

  1. [ApiController]
  2. [Route("[controller]")]
  3. public class PubsubController : ControllerBase
  4. {
  5. private DaprClient _daprClient;
  6. private ILogger<PubsubController> _logger;
  7. public PubsubController(DaprClient daprClient, ILogger<PubsubController> logger)
  8. {
  9. _daprClient = daprClient;
  10. _logger = logger;
  11. }
  12. /// <summary>
  13. /// 发布消息的方法
  14. /// </summary>
  15. /// <returns></returns>
  16. [HttpPost]
  17. [Route("pub")]
  18. public async Task<IActionResult> PublishMessage()
  19. {
  20. _logger.LogInformation("***发布消息***");
  21. var data = new UserInfo(10001,"操作员",19);
  22. await _daprClient.PublishEventAsync("pubsub", "topic",data);
  23. return Ok("***发布消息成功***");
  24. }
  25. }

2.3.2 订阅控制器

  1. [ApiController]
  2. [Route("[controller]")]
  3. public class SubController : ControllerBase
  4. {
  5. private ILogger<SubController> _logger;
  6. public SubController(ILogger<SubController> logger)
  7. {
  8. _logger = logger;
  9. }
  10. [HttpPost("sub")]
  11. [Topic("pubsub", "topic")]
  12. public IActionResult ConsumerMessage(UserInfo user)
  13. {
  14. _logger.LogInformation("***消费消息***");
  15. Console.WriteLine($"userId:{user.UserId} userName:{user.UserName}");
  16. return Ok();
  17. }
  18. }

2.4 修改文件Program.cs

  1. app.UseCloudEvents();
  2. app.MapSubscribeHandler();

2.5 切换组件 

切换为RabbitMQ

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.rabbitmq
  8. version: v1
  9. metadata:
  10. - name: host
  11. value: "amqp://123:123@192.168.157.157:5672"
  12. - name: durable
  13. value: "false"
  14. - name: deletedWhenUnused
  15. value: "false"
  16. - name: autoAck
  17. value: "false"
  18. - name: deliveryMode
  19. value: "0"
  20. - name: requeueInFailure
  21. value: "false"
  22. - name: prefetchCount
  23. value: "0"
  24. - name: reconnectWait
  25. value: "0"
  26. - name: concurrencyMode
  27. value: parallel
  28. - name: backOffPolicy
  29. value: "exponential"
  30. - name: backOffInitialInterval
  31. value: "100"
  32. - name: backOffMaxRetries
  33. value: "16"

2.6 工作原理

Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。服务将消息发布到指定主题, 业务服务订阅主题以使用消息。服务在 Dapr sidecar 上调用 pub/sub API。 然后,sidecar 调用预定义 Dapr pub/sub 组件。

总结:

Dapr的发布订阅功能使得在分布式系统中实现发布/订阅消息模式变得更加简单。主要解决了不同消息产品之间实施复杂性和功能差异的问题。你可以通过Dapr的Sidecar API使用HTTP或gRPC来发布和订阅消息。以下是关键操作的概述:

  1. 发布(Publish)消息

    • 使用http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic> URL,其中 <dapr-port> 是Dapr Sidecar监听的端口,<pub-sub-name> 是选择的发布/订阅组件名,而 <topic> 是消息的目标主题。
  2. 订阅(Subscribe)消息

    • 应用程序在启动时,通过http://localhost:<appPort>/dapr/subscribe指定其订阅,其中 <appPort> 是应用程序监听的端口。
    • 订阅者处理消息后返回非错误响应,Dapr认为消息传递成功。
    • 支持订阅者通过响应负载中的状态进行精细化控制,比如指示重试(RETRY)或丢弃(DROP)消息。

Dapr的状态管理提供了一种跨服务持久化数据的方法,支持多种存储后端。关键特性包括:

  1. 原子性操作:支持原子性的读写操作,保证一致性。

  2. 版本控制:允许跟踪状态更改的历史版本,便于回滚。

  3. 事件驱动:状态变化可触发回调函数,实现基于状态变化的自动化操作。

  4. 过期策略:可设置状态项的过期时间。

  5. 备份与恢复:提供状态备份和恢复机制,确保高可用性。 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/383601
推荐阅读
相关标签
  

闽ICP备14008679号