赞
踩
首先创建订单微服务e-commerce-order-service
pom
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>e-commerce-service</artifactId>
- <groupId>com.taluohui.ecommerce</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>e-commerce-order-service</artifactId>
- <version>1.0-SNAPSHOT</version>
- <packaging>jar</packaging>
-
- <!-- 模块名及描述信息 -->
- <name>e-commerce-order-service</name>
- <description>订单服务</description>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
-
- <dependencies>
- <!-- spring cloud alibaba nacos discovery 依赖 -->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
- </dependency>
- <!-- zipkin = spring-cloud-starter-sleuth + spring-cloud-sleuth-zipkin-->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-zipkin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>2.5.0.RELEASE</version>
- </dependency>
- <!-- Java Persistence API, ORM 规范 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-jpa</artifactId>
- </dependency>
- <!-- MySQL 驱动, 注意, 这个需要与 MySQL 版本对应 -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.12</version>
- <scope>runtime</scope>
- </dependency>
- <!-- SpringCloud Stream + Kafka -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-binder-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>com.taluohui.ecommerce</groupId>
- <artifactId>e-commerce-service-config</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.taluohui.ecommerce</groupId>
- <artifactId>e-commerce-service-sdk</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <!-- 集成 hystrix -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
- </dependency>
- </dependencies>
-
- <!--
- SpringBoot的Maven插件, 能够以Maven的方式为应用提供SpringBoot的支持,可以将
- SpringBoot应用打包为可执行的jar或war文件, 然后以通常的方式运行SpringBoot应用
- -->
- <build>
- <finalName>${artifactId}</finalName>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>repackage</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
配置文件
- server:
- port: 8002
- servlet:
- context-path: /ecommerce-order-service
-
- spring:
- main:
- allow-bean-definition-overriding: true #允许同名的Bean
- application:
- name: e-commerce-order-service # 应用名称也是构成 Nacos 配置管理 dataId 字段的一部分 (当 config.prefix 为空时)
- cloud:
- stream:
- kafka:
- binder:
- brokers: 1.15.247.9:9092
- auto-create-topics: true
- bindings:
- logisticsOutput:
- destination: e-commerce-topic # kafka topic
- content-type: text/plain
- alibaba:
- seata:
- tx-service-group: imooc-ecommerce # seata 全局事务分组
- nacos:
- # 服务注册发现
- discovery:
- enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
- server-addr: 1.15.247.9:8848
- # server-addr: 127.0.0.1:8848,127.0.0.1:8849,127.0.0.1:8850 # Nacos 服务器地址
- namespace: 22d40198-8462-499d-a7fe-dbb2da958648
- metadata:
- management:
- context-path: ${server.servlet.context-path}/actuator
- # 开启 ribbon 重试机制, 即获取服务失败是否从另外一个节点重试
- loadbalancer:
- retry:
- enabled: true
- kafka:
- bootstrap-servers: 1.15.247.9:9092
- producer:
- retries: 3
- consumer:
- auto-offset-reset: latest
- sleuth:
- sampler:
- # ProbabilityBasedSampler 抽样策略
- probability: 1.0 # 采样比例, 1.0 表示 100%, 默认是 0.1
- # RateLimitingSampler 抽样策略, 设置了限速采集, spring.sleuth.sampler.probability 属性值无效
- rate: 100 # 每秒间隔接受的 trace 量
- zipkin:
- sender:
- type: kafka # 默认是 web
- base-url: http://1.15.247.9:9411/
- jpa:
- show-sql: true
- hibernate:
- ddl-auto: none
- properties:
- hibernate.show_sql: true
- hibernate.format_sql: true
- open-in-view: false
- datasource:
- # 数据源
- url: jdbc:mysql://1.15.247.9:3306/ecommerce?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false
- username: root
- password: Cjw970404
- type: com.zaxxer.hikari.HikariDataSource
- driver-class-name: com.mysql.cj.jdbc.Driver
- # 连接池
- hikari:
- maximum-pool-size: 8
- minimum-idle: 4
- idle-timeout: 30000
- connection-timeout: 30000
- max-lifetime: 45000
- auto-commit: true
- pool-name: ImoocEcommerceHikariCP
-
- # feign 相关的配置
- feign:
- hystrix:
- enabled: true # Hystrix 默认是关闭的
- client:
- config:
- default: # 全局的
- connectTimeout: 2000 # 默认的连接超时时间是 10s
- readTimeout: 5000
-
- # 暴露端点
- management:
- endpoints:
- web:
- exposure:
- include: '*'
- endpoint:
- health:
- show-details: always
入口启动类
- /**
- * <h1>订单微服务启动入口</h1>
- * */
- @EnableJpaAuditing
- @SpringBootApplication
- @EnableCircuitBreaker
- @EnableFeignClients
- @EnableDiscoveryClient
- public class OrderApplication {
-
- public static void main(String[] args) {
-
- SpringApplication.run(OrderApplication.class, args);
- }
- }
订单的sql,这边只是简单的演示,还有很多字段,可以自行添加
- -- 创建 t_ecommerce_order 数据表
- CREATE TABLE IF NOT EXISTS `ecommerce`.`t_ecommerce_order` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
- `user_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '用户 id',
- `address_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '用户地址记录 id',
- `order_detail` text NOT NULL COMMENT '订单详情(json 存储, goodsId, count)',
- `create_time` datetime NOT NULL DEFAULT '0000-01-01 00:00:00' COMMENT '创建时间',
- `update_time` datetime NOT NULL DEFAULT '0000-01-01 00:00:00' COMMENT '更新时间',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8 COMMENT='用户订单表';
entity
- /**
- * <h1>用户订单表实体类定义</h1>
- * */
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- @Entity
- @EntityListeners(AuditingEntityListener.class)
- @Table(name = "t_ecommerce_order")
- public class EcommerceOrder {
-
- /** 自增主键 */
- @Id
- @GeneratedValue(strategy = GenerationType.IDENTITY)
- @Column(name = "id", nullable = false)
- private Long id;
-
- /** 用户 id */
- @Column(name = "user_id", nullable = false)
- private Long userId;
-
- /** 用户地址 id */
- @Column(name = "address_id", nullable = false)
- private Long addressId;
-
- /** 订单详情(json 存储) */
- @Column(name = "order_detail", nullable = false)
- private String orderDetail;
-
- /** 创建时间 */
- @CreatedDate
- @Column(name = "create_time", nullable = false)
- private Date createTime;
-
- /** 更新时间 */
- @LastModifiedDate
- @Column(name = "update_time", nullable = false)
- private Date updateTime;
-
- public EcommerceOrder(Long userId, Long addressId, String orderDetail) {
-
- this.userId = userId;
- this.addressId = addressId;
- this.orderDetail = orderDetail;
- }
- }
vo
- /**
- * <h1>订单详情</h1>
- * */
- @ApiModel(description = "分页订单详情对象")
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public class PageSimpleOrderDetail {
- @ApiModelProperty(value = "订单详情")
- private List<SingleOrderItem> orderItems;
-
- @ApiModelProperty(value = "是否有更多的订单(分页)")
- private Boolean hasMore;
-
- /**
- * <h2>单个订单信息</h2>
- * */
- @ApiModel(description = "单个订单信息对象")
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public static class SingleOrderItem {
-
- @ApiModelProperty(value = "订单表主键 id")
- private Long id;
-
- @ApiModelProperty(value = "用户地址信息")
- private UserAddress userAddress;
-
- @ApiModelProperty(value = "订单商品信息")
- private List<SingleOrderGoodsItem> goodsItems;
- }
-
- @ApiModel(description = "单个订单中的单项商品信息")
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public static class SingleOrderGoodsItem {
-
- @ApiModelProperty(value = "简单商品信息")
- private SimpleGoodsInfo simpleGoodsInfo;
-
- @ApiModelProperty(value = "商品个数")
- private Integer count;
- }
- }
以及用于微服务之间传递的类
- /**
- * <h1>订单信息</h1>
- */
- @ApiModel(description = "用户发起购买订单")
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public class OrderInfo {
-
- @ApiModelProperty(value = "用户地址表主键 id")
- private Long userAddress;
-
- @ApiModelProperty(value = "订单中的商品信息")
- private List<OrderItem> orderItems;
-
- /**
- * <h2>订单中的商品信息</h2>
- * */
- @ApiModel(description = "订单中的单项商品信息")
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public static class OrderItem {
-
- @ApiModelProperty(value = "商品表主键 id")
- private Long goodsId;
-
- @ApiModelProperty(value = "购买商品个数")
- private Integer count;
-
- public DeductGoodsInventory toDeductGoodsInventory() {
- return new DeductGoodsInventory(this.goodsId, this.count);
- }
- }
- }
dao
- /**
- * <h1>EcommerceOrder Dao 接口定义</h1>
- * */
- public interface EcommerceOrderDao extends PagingAndSortingRepository<EcommerceOrder, Long> {
-
- /**
- * <h2>根据 userId 查询分页订单</h2>
- * select * from t_ecommerce_order where user_id = ?
- * order by ... desc/asc limit x offset y
- * */
- Page<EcommerceOrder> findAllByUserId(Long userId, Pageable pageable);
- }
service接口
- /**
- * <h1>订单相关服务接口定义</h1>
- * */
- public interface IOrderService {
-
- /**
- * <h2>下单(分布式事务): 创建订单 -> 扣减库存 -> 扣减余额 -> 创建物流信息(Stream + Kafka)</h2>
- * */
- TableId createOrder(OrderInfo orderInfo);
-
- /**
- * <h2>获取当前用户的订单信息: 带有分页</h2>
- * */
- PageSimpleOrderDetail getSimpleOrderDetailByPage(int page);
- }
由于订单微服务需要依赖其他微服务,所以需要使用到openFeign,进行微服务之间的通信。这里的openFeign只是简化使用,需要更多的设置,可以看前面专门写的openfeign。
由于所有的微服务都会做权限的校验,所以需要对openFeign进行配置,让他的请求头带上校验信息。配置文件如下。(在order微服务下创建feign文件夹,用于openFeign的使用)
- /**
- * <h1>Feign 调用时, 把 Header 也传递到服务提供方</h1>
- * */
- @Slf4j
- @Configuration
- public class FeignConfig {
-
- /**
- * <h2>给 Feign 配置请求拦截器</h2>
- * RequestInterceptor 是我们提供给 open-feign 的请求拦截器, 把 Header 信息传递
- * */
- @Bean
- public RequestInterceptor headerInterceptor() {
-
- return template -> {
-
- ServletRequestAttributes attributes =
- (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
- if (null != attributes) {
- HttpServletRequest request = attributes.getRequest();
- Enumeration<String> headerNames = request.getHeaderNames();
- if (null != headerNames) {
- while (headerNames.hasMoreElements()) {
- String name = headerNames.nextElement();
- String values = request.getHeader(name);
- // 不能把当前请求的 content-length 传递到下游的服务提供方, 这明显是不对的
- // 请求可能一直返回不了, 或者是请求响应数据被截断
- if (!name.equalsIgnoreCase("content-length")) {
- // 这里的 template 就是 RestTemplate
- template.header(name, values);
- }
- }
- }
- }
- };
- }
- }
下面创建不需要兜底策略的服务间通信,并不是所有服务间通信都需要兜底策略,有一些没有完成的需要即使的报错,比如扣减用户余额,如果使用兜底策略,明显是不对的。
- /**
- * <h1>用户账户服务 Feign 接口</h1>
- * */
- @FeignClient(
- contextId = "NotSecuredBalanceClient",
- value = "e-commerce-account-service"
- )
- public interface NotSecuredBalanceClient {
-
- @RequestMapping(
- value = "/ecommerce-account-service/balance/deduct-balance",
- method = RequestMethod.PUT
- )
- CommonResponse<BalanceInfo> deductBalance(@RequestBody BalanceInfo balanceInfo);
- }
商品
- public interface NotSecuredGoodsClient {
- /**
- * <h2>根据 ids 扣减商品库存</h2>
- * */
- @RequestMapping(
- value = "/ecommerce-goods-service/goods/deduct-goods-inventory",
- method = RequestMethod.PUT
- )
- CommonResponse<Boolean> deductGoodsInventory(
- @RequestBody List<DeductGoodsInventory> deductGoodsInventories);
-
- /**
- * <h2>根据 ids 查询简单的商品信息</h2>
- * */
- @RequestMapping(
- value = "/ecommerce-goods-service/goods/simple-goods-info",
- method = RequestMethod.POST
- )
- CommonResponse<List<SimpleGoodsInfo>> getSimpleGoodsInfoByTableId(
- @RequestBody TableId tableId);
- }
在feign文件夹下创建hystrix文件夹,用于存放兜底策略
- /**
- * <h1>用户账户服务 Feign 接口(安全的)</h1>
- * */
- @FeignClient(
- contextId = "AddressClient",
- value = "e-commerce-account-service",
- fallback = AddressClientHystrix.class
- )
- public interface AddressClient {
-
- /**
- * <h2>根据 id 查询地址信息</h2>
- * */
- @RequestMapping(
- value = "/ecommerce-account-service/address/address-info-by-table-id",
- method = RequestMethod.POST
- )
- CommonResponse<AddressInfo> getAddressInfoByTablesId(@RequestBody TableId tableId);
- }
兜底策略
- /**
- * <h1>账户服务熔断降级兜底策略</h1>
- * */
- @Slf4j
- @Component
- public class AddressClientHystrix implements AddressClient {
- @Override
- public CommonResponse<AddressInfo> getAddressInfoByTablesId(TableId tableId) {
- log.error("[account client feign request error in order service] get address info" +
- "error: [{}]", JSON.toJSONString(tableId));
- return new CommonResponse<>(
- -1,
- "[account client feign request error in order service]",
- new AddressInfo(-1L, Collections.emptyList())
- );
- }
- }
- /**
- * <h1>安全的商品服务 Feign 接口</h1>
- * */
- @FeignClient(
- contextId = "SecuredGoodsClient",
- value = "e-commerce-goods-service",
- fallback = GoodsClientHystrix.class
- )
- public interface SecuredGoodsClient {
-
- /**
- * <h2>根据 ids 查询简单的商品信息</h2>
- * */
- @RequestMapping(
- value = "/ecommerce-goods-service/goods/simple-goods-info",
- method = RequestMethod.POST
- )
- CommonResponse<List<SimpleGoodsInfo>> getSimpleGoodsInfoByTableId(
- @RequestBody TableId tableId);
- }
兜底策略
- /**
- * <h1>商品服务熔断降级兜底</h1>
- * */
- @Slf4j
- @Component
- public class GoodsClientHystrix implements SecuredGoodsClient {
- @Override
- public CommonResponse<List<SimpleGoodsInfo>> getSimpleGoodsInfoByTableId(TableId tableId) {
- log.error("[goods client feign request error in order service] get simple goods" +
- "error: [{}]", JSON.toJSONString(tableId));
- return new CommonResponse<>(
- -1,
- "[goods client feign request error in order service]",
- Collections.emptyList()
- );
- }
- }
- <!-- seata-->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
- </dependency>
- <!-- 注册Seata数据源需要的连接池-->
- <dependency>
- <groupId>com.zaxxer</groupId>
- <artifactId>HikariCP</artifactId>
- <optional>true</optional>
- </dependency>
- CREATE TABLE IF NOT EXISTS `ecommerce`.`undo_log` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT,
- `branch_id` bigint(20) NOT NULL,
- `xid` varchar(100) NOT NULL,
- `context` varchar(128) NOT NULL,
- `rollback_info` longblob NOT NULL,
- `log_status` int(11) NOT NULL,
- `log_created` datetime NOT NULL,
- `log_modified` datetime NOT NULL,
- `ext` varchar(100) DEFAULT NULL,
- PRIMARY KEY (`id`),
- UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
file.conf
- ## transaction log store, only used in seata-server
- store {
- ## store mode: file、db、redis
- mode = "db"
-
- ## file store property
- file {
- ## store location dir
- dir = "sessionStore"
- # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
- maxBranchSessionSize = 16384
- # globe session size , if exceeded throws exceptions
- maxGlobalSessionSize = 512
- # file buffer size , if exceeded allocate new buffer
- fileWriteBufferCacheSize = 16384
- # when recover batch read size
- sessionReloadReadSize = 100
- # async, sync
- flushDiskMode = async
- }
-
- ## database store property
- db {
- ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
- datasource = "druid"
- ## mysql/oracle/postgresql/h2/oceanbase etc.
- dbType = "mysql"
- driverClassName = "com.mysql.jdbc.Driver"
- url = "jdbc:mysql://127.0.0.1:3306/seata?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false"
- user = "root"
- password = "root"
- minConn = 5
- maxConn = 100
- globalTable = "global_table"
- branchTable = "branch_table"
- lockTable = "lock_table"
- queryLimit = 100
- maxWait = 5000
- }
-
- ## redis store property
- redis {
- host = "127.0.0.1"
- port = "6379"
- password = ""
- database = "0"
- minConn = 1
- maxConn = 10
- maxTotal = 100
- queryLimit = 100
- }
-
- }
- ##事务分组
- service {
- vgroupMapping.imooc-ecommerce = "default"
- default.grouplist = "127.0.0.1:8091"
- }
- client {
- async.commit.buffer.limit = 10000
- lock {
- retry.internal = 10
- retry.times = 30
- }
- }
注意:需要在MySQL中创建库和表
- CREATE DATABASE `seata`;
-
- CREATE TABLE IF NOT EXISTS `seata`.`global_table`
- (
- `xid` VARCHAR(128) NOT NULL,
- `transaction_id` BIGINT,
- `status` TINYINT NOT NULL,
- `application_id` VARCHAR(32),
- `transaction_service_group` VARCHAR(32),
- `transaction_name` VARCHAR(128),
- `timeout` INT,
- `begin_time` BIGINT,
- `application_data` VARCHAR(2000),
- `gmt_create` DATETIME,
- `gmt_modified` DATETIME,
- PRIMARY KEY (`xid`),
- KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
- KEY `idx_transaction_id` (`transaction_id`)
- ) ENGINE = InnoDB
- DEFAULT CHARSET = utf8;
-
- -- the table to store BranchSession data
- CREATE TABLE IF NOT EXISTS `seata`.`branch_table`
- (
- `branch_id` BIGINT NOT NULL,
- `xid` VARCHAR(128) NOT NULL,
- `transaction_id` BIGINT,
- `resource_group_id` VARCHAR(32),
- `resource_id` VARCHAR(256),
- `branch_type` VARCHAR(8),
- `status` TINYINT,
- `client_id` VARCHAR(64),
- `application_data` VARCHAR(2000),
- `gmt_create` DATETIME,
- `gmt_modified` DATETIME,
- PRIMARY KEY (`branch_id`),
- KEY `idx_xid` (`xid`)
- ) ENGINE = InnoDB
- DEFAULT CHARSET = utf8;
-
- -- the table to store lock data
- CREATE TABLE IF NOT EXISTS `seata`.`lock_table`
- (
- `row_key` VARCHAR(128) NOT NULL,
- `xid` VARCHAR(96),
- `transaction_id` BIGINT,
- `branch_id` BIGINT NOT NULL,
- `resource_id` VARCHAR(256),
- `table_name` VARCHAR(32),
- `pk` VARCHAR(36),
- `gmt_create` DATETIME,
- `gmt_modified` DATETIME,
- PRIMARY KEY (`row_key`),
- KEY `idx_branch_id` (`branch_id`)
- ) ENGINE = InnoDB
- DEFAULT CHARSET = utf8;
registry.conf
- registry {
- # file、nacos、eureka、redis、zk、consul
- type = "file"
-
- file {
- name = "file.conf"
- }
-
- }
-
- config {
- type = "file"
-
- file {
- name = "file.conf"
- }
- }
bootstrap.yml
- spring:
- cloud:
- alibaba:
- seata:
- tx-service-group: imooc-ecommerce # seata 全局事务分组
- import com.zaxxer.hikari.HikariDataSource;
- import io.seata.rm.datasource.DataSourceProxy;
- import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
-
- import javax.sql.DataSource;
-
- /**
- * <h1>Seata 所需要的数据源代理配置类</h1>
- * */
- @Configuration
- public class DataSourceProxyAutoConfiguration {
-
- private final DataSourceProperties dataSourceProperties;
-
- public DataSourceProxyAutoConfiguration(DataSourceProperties dataSourceProperties) {
- this.dataSourceProperties = dataSourceProperties;
- }
-
- /**
- * <h2>配置数据源代理, 用于 Seata 全局事务回滚</h2>
- * before image + after image -> undo_log
- * */
- @Primary
- @Bean("dataSource")
- public DataSource dataSource() {
-
- HikariDataSource dataSource = new HikariDataSource();
- dataSource.setJdbcUrl(dataSourceProperties.getUrl());
- dataSource.setUsername(dataSourceProperties.getUsername());
- dataSource.setPassword(dataSourceProperties.getPassword());
- dataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
-
- return new DataSourceProxy(dataSource);
- }
- }
- /**
- * <h2>添加拦截器配置</h2>
- * */
- @Override
- protected void addInterceptors(InterceptorRegistry registry) {
-
- // 添加用户身份统一登录拦截的拦截器
- registry.addInterceptor(new LoginUserInfoInterceptor())
- .addPathPatterns("/**").order(0);
- // Seata 传递 xid 事务 id 给其他的微服务
- // 只有这样, 其他的服务才会写 undo_log, 才能够实现回滚
- registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**");
- }
首先引入依赖,添加配置文件(在创建微服务时已经引入,看上面)
这里使用的是自定义通道,所以要创建自定义通道接口
- /**
- * <h1>自定义物流消息通信信道(Source)</h1>
- * */
- public interface LogisticsSource {
-
- /** 输出信道名称 */
- String OUTPUT = "logisticsOutput";
-
- /**
- * <h2>物流 Source -> logisticsOutput</h2>
- * 通信信道的名称是 logisticsOutput, 对应到 yml 文件里的配置
- * */
- @Output(LogisticsSource.OUTPUT)
- MessageChannel logisticsOutput();
- }
在公共服务内创建微服务之间传递的对象
- /**
- * <h1>创建订单时发送的物流消息</h1>
- * */
- @ApiModel(description = "Stream 物流消息对象")
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public class LogisticsMessage {
-
- @ApiModelProperty(value = "用户表主键 id")
- private Long userId;
-
- @ApiModelProperty(value = "订单表主键 id")
- private Long orderId;
-
- @ApiModelProperty(value = "用户地址表主键 id")
- private Long addressId;
-
- @ApiModelProperty(value = "备注信息(json 存储)")
- private String extraInfo;
- }
结合上面所有的知识点,完成商品的下单和查询操作
- import com.alibaba.fastjson.JSON;
- import com.imooc.ecommerce.account.AddressInfo;
- import com.imooc.ecommerce.account.BalanceInfo;
- import com.imooc.ecommerce.common.TableId;
- import com.imooc.ecommerce.dao.EcommerceOrderDao;
- import com.imooc.ecommerce.entity.EcommerceOrder;
- import com.imooc.ecommerce.feign.AddressClient;
- import com.imooc.ecommerce.feign.NotSecuredBalanceClient;
- import com.imooc.ecommerce.feign.NotSecuredGoodsClient;
- import com.imooc.ecommerce.feign.SecuredGoodsClient;
- import com.imooc.ecommerce.filter.AccessContext;
- import com.imooc.ecommerce.goods.DeductGoodsInventory;
- import com.imooc.ecommerce.goods.SimpleGoodsInfo;
- import com.imooc.ecommerce.order.LogisticsMessage;
- import com.imooc.ecommerce.order.OrderInfo;
- import com.imooc.ecommerce.service.IOrderService;
- import com.imooc.ecommerce.source.LogisticsSource;
- import com.imooc.ecommerce.vo.PageSimpleOrderDetail;
- import io.seata.spring.annotation.GlobalTransactional;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.collections4.CollectionUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.cloud.stream.annotation.EnableBinding;
- import org.springframework.data.domain.Page;
- import org.springframework.data.domain.PageRequest;
- import org.springframework.data.domain.Pageable;
- import org.springframework.data.domain.Sort;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Service;
-
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.function.Function;
- import java.util.stream.Collectors;
-
- /**
- * <h1>订单相关服务接口实现</h1>
- * */
- @Slf4j
- @Service
- @EnableBinding(LogisticsSource.class)
- public class OrderServiceImpl implements IOrderService {
-
- /** 表的 dao 接口 */
- private final EcommerceOrderDao orderDao;
-
- /** Feign 客户端 */
- private final AddressClient addressClient;
- private final SecuredGoodsClient securedGoodsClient;
- private final NotSecuredGoodsClient notSecuredGoodsClient;
- private final NotSecuredBalanceClient notSecuredBalanceClient;
-
- /** SpringCloud Stream 的发射器 */
- private final LogisticsSource logisticsSource;
-
- public OrderServiceImpl(EcommerceOrderDao orderDao,
- AddressClient addressClient,
- SecuredGoodsClient securedGoodsClient,
- NotSecuredGoodsClient notSecuredGoodsClient,
- NotSecuredBalanceClient notSecuredBalanceClient,
- LogisticsSource logisticsSource) {
- this.orderDao = orderDao;
- this.addressClient = addressClient;
- this.securedGoodsClient = securedGoodsClient;
- this.notSecuredGoodsClient = notSecuredGoodsClient;
- this.notSecuredBalanceClient = notSecuredBalanceClient;
- this.logisticsSource = logisticsSource;
- }
-
- /**
- * <h2>创建订单: 这里会涉及到分布式事务</h2>
- * 创建订单会涉及到多个步骤和校验, 当不满足情况时直接抛出异常;
- * 1. 校验请求对象是否合法
- * 2. 创建订单
- * 3. 扣减商品库存
- * 4. 扣减用户余额
- * 5. 发送订单物流消息 SpringCloud Stream + Kafka
- * */
- @Override
- @GlobalTransactional(rollbackFor = Exception.class)
- public TableId createOrder(OrderInfo orderInfo) {
-
- // 获取地址信息
- AddressInfo addressInfo = addressClient.getAddressInfoByTablesId(
- new TableId(Collections.singletonList(
- new TableId.Id(orderInfo.getUserAddress())))).getData();
-
- // 1. 校验请求对象是否合法(商品信息不需要校验, 扣减库存会做校验)
- if (CollectionUtils.isEmpty(addressInfo.getAddressItems())) {
- throw new RuntimeException("user address is not exist: "
- + orderInfo.getUserAddress());
- }
-
- // 2. 创建订单
- EcommerceOrder newOrder = orderDao.save(
- new EcommerceOrder(
- AccessContext.getLoginUserInfo().getId(),
- orderInfo.getUserAddress(),
- JSON.toJSONString(orderInfo.getOrderItems())
- )
- );
- log.info("create order success: [{}], [{}]",
- AccessContext.getLoginUserInfo().getId(), newOrder.getId());
-
- // 3. 扣减商品库存
- if (
- !notSecuredGoodsClient.deductGoodsInventory(
- orderInfo.getOrderItems()
- .stream()
- .map(OrderInfo.OrderItem::toDeductGoodsInventory)
- .collect(Collectors.toList())
- ).getData()
- ) {
- throw new RuntimeException("deduct goods inventory failure");
- }
-
- // 4. 扣减用户账户余额
- // 4.1 获取商品信息, 计算总价格
- List<SimpleGoodsInfo> goodsInfos = notSecuredGoodsClient.getSimpleGoodsInfoByTableId(
- new TableId(
- orderInfo.getOrderItems()
- .stream()
- .map(o -> new TableId.Id(o.getGoodsId()))
- .collect(Collectors.toList())
- )
- ).getData();
- Map<Long, SimpleGoodsInfo> goodsId2GoodsInfo = goodsInfos.stream()
- .collect(Collectors.toMap(SimpleGoodsInfo::getId, Function.identity()));
- long balance = 0;
- for (OrderInfo.OrderItem orderItem : orderInfo.getOrderItems()) {
- balance += goodsId2GoodsInfo.get(orderItem.getGoodsId()).getPrice()
- * orderItem.getCount();
- }
- assert balance > 0;
-
- // 4.2 填写总价格, 扣减账户余额
- BalanceInfo balanceInfo = notSecuredBalanceClient.deductBalance(
- new BalanceInfo(AccessContext.getLoginUserInfo().getId(), balance)
- ).getData();
- if (null == balanceInfo) {
- throw new RuntimeException("deduct user balance failure");
- }
- log.info("deduct user balance: [{}], [{}]", newOrder.getId(),
- JSON.toJSONString(balanceInfo));
-
- // 5. 发送订单物流消息 SpringCloud Stream + Kafka
- LogisticsMessage logisticsMessage = new LogisticsMessage(
- AccessContext.getLoginUserInfo().getId(),
- newOrder.getId(),
- orderInfo.getUserAddress(),
- null // 没有备注信息
- );
- if (!logisticsSource.logisticsOutput().send(
- MessageBuilder.withPayload(JSON.toJSONString(logisticsMessage)).build()
- )) {
- throw new RuntimeException("send logistics message failure");
- }
- log.info("send create order message to kafka with stream: [{}]",
- JSON.toJSONString(logisticsMessage));
-
- // 返回订单 id
- return new TableId(Collections.singletonList(new TableId.Id(newOrder.getId())));
- }
-
- @Override
- public PageSimpleOrderDetail getSimpleOrderDetailByPage(int page) {
-
- if (page <= 0) {
- page = 1; // 默认是第一页
- }
-
- // 这里分页的规则是: 1页10条数据, 按照 id 倒序排列
- Pageable pageable = PageRequest.of(page - 1, 10,
- Sort.by("id").descending());
- Page<EcommerceOrder> orderPage = orderDao.findAllByUserId(
- AccessContext.getLoginUserInfo().getId(), pageable
- );
- List<EcommerceOrder> orders = orderPage.getContent();
-
- // 如果是空, 直接返回空数组
- if (CollectionUtils.isEmpty(orders)) {
- return new PageSimpleOrderDetail(Collections.emptyList(), false);
- }
-
- // 获取当前订单中所有的 goodsId, 这个 set 不可能为空或者是 null, 否则, 代码一定有 bug
- Set<Long> goodsIdsInOrders = new HashSet<>();
- orders.forEach(o -> {
- List<DeductGoodsInventory> goodsAndCount = JSON.parseArray(
- o.getOrderDetail(), DeductGoodsInventory.class
- );
- goodsIdsInOrders.addAll(goodsAndCount.stream()
- .map(DeductGoodsInventory::getGoodsId)
- .collect(Collectors.toSet()));
- });
-
- assert CollectionUtils.isNotEmpty(goodsIdsInOrders);
-
- // 是否还有更多页: 总页数是否大于当前给定的页
- boolean hasMore = orderPage.getTotalPages() > page;
-
- // 获取商品信息
- List<SimpleGoodsInfo> goodsInfos = securedGoodsClient.getSimpleGoodsInfoByTableId(
- new TableId(goodsIdsInOrders.stream()
- .map(TableId.Id::new).collect(Collectors.toList()))
- ).getData();
-
- // 获取地址信息
- AddressInfo addressInfo = addressClient.getAddressInfoByTablesId(
- new TableId(orders.stream()
- .map(o -> new TableId.Id(o.getAddressId()))
- .distinct().collect(Collectors.toList()))
- ).getData();
-
- // 组装订单中的商品, 地址信息 -> 订单信息
- return new PageSimpleOrderDetail(
- assembleSimpleOrderDetail(orders, goodsInfos, addressInfo),
- hasMore
- );
- }
-
- /**
- * <h2>组装订单详情</h2>
- * */
- private List<PageSimpleOrderDetail.SingleOrderItem> assembleSimpleOrderDetail(
- List<EcommerceOrder> orders, List<SimpleGoodsInfo> goodsInfos,
- AddressInfo addressInfo
- ) {
- // goodsId -> SimpleGoodsInfo
- Map<Long, SimpleGoodsInfo> id2GoodsInfo = goodsInfos.stream()
- .collect(Collectors.toMap(SimpleGoodsInfo::getId, Function.identity()));
- // addressId -> AddressInfo.AddressItem
- Map<Long, AddressInfo.AddressItem> id2AddressItem = addressInfo.getAddressItems()
- .stream().collect(
- Collectors.toMap(AddressInfo.AddressItem::getId, Function.identity())
- );
-
- List<PageSimpleOrderDetail.SingleOrderItem> result = new ArrayList<>(orders.size());
- orders.forEach(o -> {
-
- PageSimpleOrderDetail.SingleOrderItem orderItem =
- new PageSimpleOrderDetail.SingleOrderItem();
- orderItem.setId(o.getId());
- orderItem.setUserAddress(id2AddressItem.getOrDefault(o.getAddressId(),
- new AddressInfo.AddressItem(-1L)).toUserAddress());
- orderItem.setGoodsItems(buildOrderGoodsItem(o, id2GoodsInfo));
-
- result.add(orderItem);
- });
-
- return result;
- }
-
- /**
- * <h2>构造订单中的商品信息</h2>
- * */
- private List<PageSimpleOrderDetail.SingleOrderGoodsItem> buildOrderGoodsItem(
- EcommerceOrder order, Map<Long, SimpleGoodsInfo> id2GoodsInfo
- ) {
-
- List<PageSimpleOrderDetail.SingleOrderGoodsItem> goodsItems = new ArrayList<>();
- List<DeductGoodsInventory> goodsAndCount = JSON.parseArray(
- order.getOrderDetail(), DeductGoodsInventory.class
- );
-
- goodsAndCount.forEach(gc -> {
-
- PageSimpleOrderDetail.SingleOrderGoodsItem goodsItem =
- new PageSimpleOrderDetail.SingleOrderGoodsItem();
- goodsItem.setCount(gc.getCount());
- goodsItem.setSimpleGoodsInfo(id2GoodsInfo.getOrDefault(gc.getGoodsId(),
- new SimpleGoodsInfo(-1L)));
-
- goodsItems.add(goodsItem);
- });
-
- return goodsItems;
- }
- }
创建物流微服务
引入依赖
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>e-commerce-service</artifactId>
- <groupId>com.taluohui.ecommerce</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>e-commerce-logistics-service</artifactId>
- <version>1.0-SNAPSHOT</version>
- <packaging>jar</packaging>
-
- <!-- 模块名及描述信息 -->
- <name>e-commerce-logistics-service</name>
- <description>物流服务</description>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
-
- <dependencies>
- <!-- spring cloud alibaba nacos discovery 依赖 -->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
- </dependency>
- <!-- zipkin = spring-cloud-starter-sleuth + spring-cloud-sleuth-zipkin-->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-zipkin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>2.5.0.RELEASE</version>
- </dependency>
- <!-- Java Persistence API, ORM 规范 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-jpa</artifactId>
- </dependency>
- <!-- SpringCloud Stream + Kafka -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-binder-kafka</artifactId>
- </dependency>
- <!-- MySQL 驱动, 注意, 这个需要与 MySQL 版本对应 -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.12</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>com.taluohui.ecommerce</groupId>
- <artifactId>e-commerce-service-config</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.taluohui.ecommerce</groupId>
- <artifactId>e-commerce-service-sdk</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- </dependencies>
-
- <!--
- SpringBoot的Maven插件, 能够以Maven的方式为应用提供SpringBoot的支持,可以将
- SpringBoot应用打包为可执行的jar或war文件, 然后以通常的方式运行SpringBoot应用
- -->
- <build>
- <finalName>${artifactId}</finalName>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>repackage</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
添加配置
- server:
- port: 8004
-
- spring:
- main:
- allow-bean-definition-overriding: true #允许同名的Bean
- application:
- name: e-commerce-order-service # 应用名称也是构成 Nacos 配置管理 dataId 字段的一部分 (当 config.prefix 为空时)
- cloud:
- stream:
- kafka:
- binder:
- brokers: 1.15.247.9:9092
- auto-create-topics: true
- bindings:
- logisticsInput:
- destination: e-commerce-topic # kafka topic
- content-type: text/plain
- alibaba:
- seata:
- tx-service-group: imooc-ecommerce # seata 全局事务分组
- nacos:
- # 服务注册发现
- discovery:
- enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
- server-addr: 1.15.247.9:8848
- # server-addr: 127.0.0.1:8848,127.0.0.1:8849,127.0.0.1:8850 # Nacos 服务器地址
- namespace: 22d40198-8462-499d-a7fe-dbb2da958648
- # 引入 sleuth + zipkin + kafka
- kafka:
- bootstrap-servers: 1.15.247.9:9092
- producer:
- retries: 3
- consumer:
- auto-offset-reset: latest
- sleuth:
- sampler:
- # ProbabilityBasedSampler 抽样策略
- probability: 1.0 # 采样比例, 1.0 表示 100%, 默认是 0.1
- # RateLimitingSampler 抽样策略, 设置了限速采集, spring.sleuth.sampler.probability 属性值无效
- zipkin:
- sender:
- type: kafka # 默认是 web
- base-url: http://1.15.247.9:9411/
- jpa:
- show-sql: true
- hibernate:
- ddl-auto: none
- properties:
- hibernate.show_sql: true
- hibernate.format_sql: true
- open-in-view: false
- datasource:
- # 数据源
- url: jdbc:mysql://1.15.247.9:3306/ecommerce?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false
- username: root
- password: Cjw970404
- type: com.zaxxer.hikari.HikariDataSource
- driver-class-name: com.mysql.cj.jdbc.Driver
- # 连接池
- hikari:
- maximum-pool-size: 8
- minimum-idle: 4
- idle-timeout: 30000
- connection-timeout: 30000
- max-lifetime: 45000
- auto-commit: true
- pool-name: ImoocEcommerceHikariCP
-
- # 暴露端点
- management:
- endpoints:
- web:
- exposure:
- include: '*'
- endpoint:
- health:
- show-details: always
入口程序
- /**
- * <h1>物流微服务启动入口</h1>
- * */
- @Import(DataSourceProxyAutoConfiguration.class)
- @EnableJpaAuditing
- @EnableDiscoveryClient
- @SpringBootApplication
- public class LogisticsApplication {
-
- public static void main(String[] args) {
-
- SpringApplication.run(LogisticsApplication.class, args);
- }
- }
SQL
- -- 创建 t_ecommerce_logistics 数据表
- CREATE TABLE IF NOT EXISTS `ecommerce`.`t_ecommerce_logistics` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
- `user_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '用户 id',
- `order_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '订单 id',
- `address_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '用户地址记录 id',
- `extra_info` varchar(512) NOT NULL COMMENT '备注信息(json 存储)',
- `create_time` datetime NOT NULL DEFAULT '0000-01-01 00:00:00' COMMENT '创建时间',
- `update_time` datetime NOT NULL DEFAULT '0000-01-01 00:00:00' COMMENT '更新时间',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8 COMMENT='物流表';
引入seata的两个配置文件,同上
实体类
- /**
- * <h1>物流表实体类定义</h1>
- * */
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- @Entity
- @EntityListeners(AuditingEntityListener.class)
- @Table(name = "t_ecommerce_logistics")
- public class EcommerceLogistics {
- /** 自增主键 */
- @Id
- @GeneratedValue(strategy = GenerationType.IDENTITY)
- @Column(name = "id", nullable = false)
- private Long id;
-
- /** 用户 id */
- @Column(name = "user_id", nullable = false)
- private Long userId;
-
- /** 订单 id */
- @Column(name = "order_id", nullable = false)
- private Long orderId;
-
- /** 用户地址 id */
- @Column(name = "address_id", nullable = false)
- private Long addressId;
-
- /** 备注信息(json 存储) */
- @Column(name = "extra_info", nullable = false)
- private String extraInfo;
-
- /** 创建时间 */
- @CreatedDate
- @Column(name = "create_time", nullable = false)
- private Date createTime;
-
- /** 更新时间 */
- @LastModifiedDate
- @Column(name = "update_time", nullable = false)
- private Date updateTime;
-
- public EcommerceLogistics(Long userId, Long orderId, Long addressId, String extraInfo) {
-
- this.userId = userId;
- this.orderId = orderId;
- this.addressId = addressId;
- this.extraInfo = StringUtils.isNotBlank(extraInfo) ? extraInfo : "{}";
- }
- }
Dao
- /**
- * <h1>EcommerceLogistics Dao 接口定义</h1>
- * */
- public interface EcommerceLogisticsDao extends JpaRepository<EcommerceLogistics, Long> {
- }
service
- /**
- * <h1>物流服务实现</h1>
- * */
- @Slf4j
- @EnableBinding(LogisticsSink.class)
- public class LogisticsServiceImpl {
-
- private final EcommerceLogisticsDao logisticsDao;
-
- public LogisticsServiceImpl(EcommerceLogisticsDao logisticsDao) {
- this.logisticsDao = logisticsDao;
- }
-
- /**
- * <h2>订阅监听订单微服务发送的物流消息</h2>
- * */
- @StreamListener("logisticsInput")
- public void consumeLogisticsMessage(@Payload Object payload) {
-
- log.info("receive and consume logistics message: [{}]", payload.toString());
- LogisticsMessage logisticsMessage = JSON.parseObject(
- payload.toString(), LogisticsMessage.class
- );
- EcommerceLogistics ecommerceLogistics = logisticsDao.save(
- new EcommerceLogistics(
- logisticsMessage.getUserId(),
- logisticsMessage.getOrderId(),
- logisticsMessage.getAddressId(),
- logisticsMessage.getExtraInfo()
- )
- );
- log.info("consume logistics message success: [{}]", ecommerceLogistics.getId());
- }
- }
自定义消息通道
- /**
- * <h1>物流微服务启动入口</h1>
- * */
- @Import(DataSourceProxyAutoConfiguration.class)
- @EnableJpaAuditing
- @EnableDiscoveryClient
- @SpringBootApplication
- public class LogisticsApplication {
-
- public static void main(String[] args) {
-
- SpringApplication.run(LogisticsApplication.class, args);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。