赞
踩
1.创建文章,选择内容、封面、照片、定时发送时间
2.经过gateway,解析token将userid保存到请求头,服务的拦截器去解析userid放入到threadloacl
3.保存/修改文章、如果不是草稿,保存文章和图片的关系、发送异步请求
4.异步请求是将任务放到延时队列中,redis实现。
4.1 放入延时队列前,按照枚举类给创建任务的优先级、类型、参数(文章信息)等信息
4.2 调用shedule的addTask接口(shedule提供addTask、poll、cancleTask三个接口)
4.3 将数据加入到数据库,在根据发布时间加入到redis的list或者zset
4.4 两个定时任务,一个将db同步到redis(1min一次),一个将zset同步到list(1秒一次),因为 只有5min内执行的数据采访到redis,只有立即执行的数据才在list。
5.在wm服务中,有一个定时任务,每秒会调用shedule的poll接口去redis中获取一个任务,需要传递参数任务类型和优先级就行(每种任务是写死的,比如定时发布的任务类型和优先级都一样保存到枚举类中)。
6.wm获取到定时任务中的文章信息,然后调用自动审核服务的接口去审核文章。在审核功能中,审核成功后还要保存article到数据库,并且回填article_id到wm_news中,因为后面上下架文章对应的article有用。
7.当文章审核成功,会调用article服务的保存文章接口,会保存到ap_article、ap_article_config、ap_article_content表,之后在创建异步任务调用buildArticleToMinIo接口,他会将文章信息和内容通过freemarker框架生成html,然后保存到minio,将静态地址回填数据库。
8.当生成html后,并且把url保存到数据库后,就要把完整的信息存放到es中,供后面的搜索功能,即creatArtileEsIndex。
9.在实现这个功能时,也是将和es对应的实体类信息保存到kafka中,然后 在article中会有一个linster去监听并将信息加入到es。
10.至此,完整的创建文章结束
gateway中解析token获取userid,服务中(比如自媒体服务、搜索服务)创建拦截器去获取userid放入到threadlocal
网关,可以去看springcloud课程黑马的2023年的,挺好的
数据传输,ResponsResult的data类型是T,也可以是Object,在别的地方解析该data时,使用JSON.toJSONString将Object变成string,然后在JSON.parse解析到对应的类型。
可以处理自定义异常、任何类型异常,需要两个注解。
- package com.heima.common.exception;
-
-
- import com.heima.model.common.dtos.ResponseResult;
- import com.heima.model.common.enums.AppHttpCodeEnum;
- import com.sun.tools.internal.ws.wsdl.document.jaxws.Exception;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.web.bind.annotation.ControllerAdvice;
- import org.springframework.web.bind.annotation.ExceptionHandler;
- import org.springframework.web.bind.annotation.ResponseBody;
-
- @ControllerAdvice //控制器增强类
- @Slf4j
- public class ExceptionCatch {
-
- /**
- * 处理不可控异常
- * @param e
- * @return
- */
- @ExceptionHandler(Exception.class)
- @ResponseBody
- public ResponseResult exception(Exception e){
- e.printStackTrace();
- log.error("catch exception:{}",e.getMessage());
-
- return ResponseResult.errorResult(AppHttpCodeEnum.SERVER_ERROR);
- }
-
- /**
- * 处理可控异常 自定义异常
- * @param e
- * @return
- */
- @ExceptionHandler(CustomException.class)
- @ResponseBody
- public ResponseResult exception(CustomException e){
- log.error("catch exception:{}",e);
- return ResponseResult.errorResult(e.getAppHttpCodeEnum());
- }
- }
登录时,查询到ap_user中的salt信息,将用户输入的pwd和salt拼接使用md5加密得到结果,如果这个结果和ap_user中保存的加密密码一致,则登陆成功,使用AppJwtUtil生成token返回前端。
在gateway服务中设置一个全局的filter,如果请求的uri包含login,则放行。否则,从request中获取token,并判断是否为空以及使用AppJwtUtil判断token是否有效,再进行放行。
mybatis在model模块,service依赖了model模块,service下的user模块是service的子工程,所以user不需要导入mybatis模块。
使用登录功能来展示模块划分,将user-service用到的model全部放在model服务下的com.heima.model.user包下 pojo和数据库打交道的实体类,dto前端传给后端的数据。
- package com.heima.app.gateway.filter;
-
- import com.heima.app.gateway.utils.AppJwtUtil;
- import io.jsonwebtoken.Claims;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang.StringUtils;
- import org.springframework.cloud.gateway.filter.GatewayFilterChain;
- import org.springframework.cloud.gateway.filter.GlobalFilter;
- import org.springframework.core.Ordered;
- import org.springframework.http.HttpStatus;
- import org.springframework.http.server.reactive.ServerHttpRequest;
- import org.springframework.http.server.reactive.ServerHttpResponse;
- import org.springframework.stereotype.Component;
- import org.springframework.web.server.ServerWebExchange;
- import reactor.core.publisher.Mono;
-
- @Slf4j
- @Component
- public class AuthorizeFilter implements Ordered, GlobalFilter {
- @Override
- public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
- //1.获取request和response对象
- ServerHttpRequest request = exchange.getRequest();
- ServerHttpResponse response = exchange.getResponse();
- //2.判断是否登录
- if (request.getURI().getPath().contains("/login")) {
- return chain.filter(exchange);
- }
- //3.获取token
- String token = request.getHeaders().getFirst("token");
- //4.判断token是否存在
- if (StringUtils.isBlank(token)) {
- response.setStatusCode(HttpStatus.UNAUTHORIZED);
- return response.setComplete();
- }
- //5.判断token是否有效
- try {
- Claims claimsBody = AppJwtUtil.getClaimsBody(token);
- //是否是过期
- int result = AppJwtUtil.verifyToken(claimsBody);
- if(result == 1 || result == 2){
- response.setStatusCode(HttpStatus.UNAUTHORIZED);
- return response.setComplete();
- }
- }catch (Exception e){
- e.printStackTrace();
- response.setStatusCode(HttpStatus.UNAUTHORIZED);
- return response.setComplete();
- }
- //6.放行
- return chain.filter(exchange);
- }
-
- /**
- * 值越小,过滤器越先执行
- * @return
- */
- @Override
- public int getOrder() {
- return 0;
- }
- }
修改nginx配置即可。如下为整个流程
记住nginx的配置结构就行,详细过程查看md文档。
step1.前端发起请求。
浏览器搜索localhost:8882会找到nginx的页面app-web
- upstream heima-app-gateway{
- server localhost:51601;
- }
-
- server {
- listen 8881;
- location / {
- root D:/Projects/java/leadnews/app-web/;
- index index.html;
- }
-
- location ~/app/(.*) {
- proxy_pass http://heima-app-gateway/$1;
- proxy_set_header HOST $host; # 不改变源请求头的值
- proxy_pass_request_body on; #开启获取请求体
- proxy_pass_request_headers on; #开启获取请求头
- proxy_set_header X-Real-IP $remote_addr; # 记录真实发出请求的客户端IP
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; #记录代理信息
- }
- }
加入nginx部署在localhost,登录功能http://localhost:8881/app/user/api/v1/login/login_auth/
这个请求的app是在nginx中映射到网关,user是在网关的配置中映射到不同的微服务,
通过网关的配置进行映射
- spring:
- cloud:
- gateway:
- globalcors:
- add-to-simple-url-handler-mapping: true
- corsConfigurations:
- '[/**]':
- allowedHeaders: "*"
- allowedOrigins: "*"
- allowedMethods:
- - GET
- - POST
- - DELETE
- - PUT
- - OPTION
- routes:
- # 用户微服务
- - id: user
- uri: lb://leadnews-user
- predicates:
- - Path=/user/**
- filters:
- - StripPrefix= 1
- # 文章微服务
- - id: article
- uri: lb://leadnews-article
- predicates:
- - Path=/article/**
- filters:
- - StripPrefix= 1
- #搜索微服务
- - id: leadnews-search
- uri: lb://leadnews-search
- predicates:
- - Path=/search/**
- filters:
- - StripPrefix= 1
- #行为微服务
- - id: leadnews-behavior
- uri: lb://leadnews-behavior
- predicates:
- - Path=/behavior/**
- filters:
- - StripPrefix= 1
- #评论微服务
- - id: leadnews-comment
- uri: lb://leadnews-comment
- predicates:
- - Path=/comment/**
- filters:
- - StripPrefix= 1
查询列表就是很简单,看下面的sql就行了。
- <select id="loadArticleList" resultMap="resultMap">
- SELECT
- aa.*
- FROM
- `ap_article` aa
- LEFT JOIN ap_article_config aac ON aa.id = aac.article_id
- <where>
- and aac.is_delete != 1
- and aac.is_down != 1
- <!-- loadmore -->
- <if test="type != null and type == 1">
- and aa.publish_time <![CDATA[<]]> #{dto.minBehotTime}
- </if>
- <if test="type != null and type == 2">
- and aa.publish_time <![CDATA[>]]> #{dto.maxBehotTime}
- </if>
- <if test="dto.tag != '__all__'">
- and aa.channel_id = #{dto.tag}
- </if>
- </where>
- order by aa.publish_time desc
- limit #{dto.size}
- </select>
表结构详细见ppt
为什么要拆分表?
涉及下面三个接口。
MinIOConfigProperties表示需要配置的属性构成的类。 @ConfigurationProperties(prefix = "minio")表示将application.properties或application.yml文件中开头为minio的属性注入进去。
- package com.heima.file.config;
-
-
- import lombok.Data;
- import org.springframework.boot.context.properties.ConfigurationProperties;
-
- import java.io.Serializable;
-
- @Data
- @ConfigurationProperties(prefix = "minio") // 文件上传 配置前缀file.oss
- public class MinIOConfigProperties implements Serializable {
-
- private String accessKey;
- private String secretKey;
- private String bucket;
- private String endpoint;
- private String readPath;
- }
- minio:
- accessKey: minio
- secretKey: minio123
- bucket: leadnews
- endpoint: http://192.168.200.130:9000
- readPath: http://192.168.200.130:9000
MinIOConfig类可以使用这个配置类(MinIOConfigProperties),并且可以将其声明为一个@Bean
- package com.heima.file.config;
-
- import com.heima.file.service.FileStorageService;
- import io.minio.MinioClient;
- import lombok.Data;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
- import org.springframework.boot.context.properties.EnableConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
-
- @Data
- @Configuration
- @EnableConfigurationProperties({MinIOConfigProperties.class})
- //当引入FileStorageService接口时
- @ConditionalOnClass(FileStorageService.class)
- public class MinIOConfig {
-
- @Autowired
- private MinIOConfigProperties minIOConfigProperties;
-
- @Bean
- public MinioClient buildMinioClient() {
- return MinioClient
- .builder()
- .credentials(minIOConfigProperties.getAccessKey(), minIOConfigProperties.getSecretKey())
- .endpoint(minIOConfigProperties.getEndpoint())
- .build();
- }
- }
业务逻辑:查看列表详情,实际上是minio这个分布式文件系统中拿到html,所以apArticle中要保存这个html的地址,下面的函数就是根据articleId去ap_article_content中找到文章内容,插入到freemarker模板中并生成html,将该html记录在minio中,并记录url到ap_article表中。实际上该功能需要在添加文章内容时就实现。
- package com.heima.article.test;
-
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.alibaba.fastjson.JSONArray;
- import com.heima.article.mapper.ApArticleContentMapper;
- import com.heima.article.mapper.ApArticleMapper;
- import com.heima.file.service.FileStorageService;
- import com.heima.model.article.pojos.ApArticle;
- import com.heima.model.article.pojos.ApArticleContent;
- import freemarker.template.Configuration;
- import freemarker.template.Template;
- import org.apache.commons.lang.StringUtils;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
-
- import java.io.ByteArrayInputStream;
- import java.io.InputStream;
- import java.io.StringWriter;
- import java.util.HashMap;
- import java.util.Map;
-
- @SpringBootTest
- @RunWith(SpringRunner.class)
- public class ArticleFreemarkerTest {
-
- @Autowired
- private ApArticleContentMapper apArticleContentMapper;
-
- @Autowired
- private Configuration configuration;
-
- @Autowired
- private FileStorageService fileStorageService;
-
- @Autowired
- private ApArticleMapper apArticleMapper;
-
- /**
- * 已知文章id,生成内容的html模板文件。这里手动生成,后面会集成在添加文章功能里面。
- * @throws Exception
- * @author 徐路明
- */
- @Test
- public void createStaticUrlTest() throws Exception {
- // 1.接受文章内容
- LambdaQueryWrapper<ApArticleContent> queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(ApArticleContent::getArticleId,1383827787629252610L);
- ApArticleContent apArticleContent = apArticleContentMapper.selectOne(queryWrapper);
- if (apArticleContent != null && StringUtils.isNotBlank(apArticleContent.getContent())) {
- StringWriter out = new StringWriter();
- //2.文章内容通过freemarker生成html文件
- Template template = configuration.getTemplate("article.ftl");
- // 将数据模型放到map中
- Map<String,Object> content = new HashMap<>();
- content.put("content", JSONArray.parseArray(apArticleContent.getContent()));
- // 合成
- template.process(content, out);
-
- //3.把html上传到minio中
- InputStream is = new ByteArrayInputStream(out.toString().getBytes());
- String path = fileStorageService.uploadHtmlFile("", apArticleContent.getArticleId() + ".html", is);
-
- //4.修改ap_article表,保存static_url字段
- ApArticle article = new ApArticle();
- article.setId(apArticleContent.getArticleId());
- article.setStaticUrl(path);
- apArticleMapper.updateById(article);
- }
-
- }
- }
执行前后的图如下:
访问该链接
第一步,添加登录功能以及gateway的filter,和app登录一模一样。
表结构:
wm_user:用户表
wm_channel: 频道表,存放所有频道(即文章类型)
wm_material:存放素材,图片、视频之类的,保存它们的urlvm_news:保存文章
wm_news_material 文章素材关系表
前端上传请求首先会经过网关判断登录状态,因为后面要用到当前登录userid的信息,所以才要保存到header供后面使用。之所以app端没有这一操作,是因为当前不需要使用到登录user的相关信息。首先,从gateway网关使用serverHttpRequest中存放userId到header,其中userId从token解析出来。然后到自媒体服务中,经过拦截器被拦截,获取userid存放到ThreadLocal中。
本章实现的功能包含:文章发布,其实就是上传一个文章并保存到数据库嘛,文章列表查询、频道列表查询等。
step1. 在gateway中将user信息解析保存到header头
- //获得token解析后中的用
- Object userId = claimsBody.get("id");
- //在header中添加新的信息
- ServerHttpRequest serverHttpRequest = request.mutate().headers(httpHeaders -> {
- httpHeaders.add("userId", userId + "");
- }).build();
- //重置header
- exchange.mutate().request(serverHttpRequest).build();
step2.添加interception和WebMvcConfig(缺一不可)
- @Configuration
- public class WebMvcConfig implements WebMvcConfigurer {
- @Override
- public void addInterceptors(InterceptorRegistry registry) {
- registry.addInterceptor(new WmTokenInterceptor()).addPathPatterns("/**");
- }
- }
- package com.heima.wemedia.interceptor;
-
- import com.heima.model.wemedia.pojos.WmUser;
- import com.heima.utils.common.WmThreadLocalUtils;
- import org.apache.commons.lang.StringUtils;
- import org.springframework.web.servlet.HandlerInterceptor;
- import org.springframework.web.servlet.ModelAndView;
-
- import javax.servlet.http.HttpServletRequest;
- import javax.servlet.http.HttpServletResponse;
-
- public class WmTokenInterceptor implements HandlerInterceptor {
-
- /**
- * 得到当前http请求的header中的user信息,保存到ThreadLocal中
- * @param request
- * @param response
- * @param handler
- * @return
- * @throws Exception
- * @author 徐路明
- */
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
- String userId = request.getHeader("userId");
- if (StringUtils.isNotBlank(userId)) {
- WmUser wmUser = new WmUser();
- wmUser.setId(Integer.valueOf(userId));
- WmThreadLocalUtils.setUser(wmUser);
- }
- return true;
- }
-
- @Override
- public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
- WmThreadLocalUtils.clear();
- }
- }
看源码吧,没啥东西。将文件上传到minio,然后记录数据库到表wm_material。
一个知识点:分页查询。
- @Override
- public ResponseResult findList(WmMaterialDto dto) {
-
- dto.checkParam();
-
- IPage page = new Page(dto.getPage(),dto.getSize());
- LambdaQueryWrapper<WmMaterial> queryWrapper = new LambdaQueryWrapper<>();
- if (dto.getIsCollection() != null && dto.getIsCollection() == 1) {
- queryWrapper.eq(WmMaterial::getIsCollection,dto.getIsCollection());
- }
- queryWrapper.eq(WmMaterial::getUserId,WmThreadLocalUtils.getUser().getId());
- queryWrapper.orderByDesc(WmMaterial::getCreatedTime);
- page = page(page,queryWrapper);
-
- ResponseResult responseResult = new PageResponseResult(dto.getPage(),dto.getSize(),(int)page.getTotal());
- responseResult.setData(page.getRecords());
- return responseResult;
-
- }
改成mybatis如下:
- @Override
- public ResponseResult findList(WmMaterialDto dto) {
- dto.checkParam();
-
- // 构建分页参数
- int offset = (dto.getPage() - 1) * dto.getSize();
- int limit = dto.getSize();
-
- // 构建查询条件
- Map<String, Object> paramMap = new HashMap<>();
- paramMap.put("userId", WmThreadLocalUtils.getUser().getId());
- if (dto.getIsCollection() != null && dto.getIsCollection() == 1) {
- paramMap.put("isCollection", dto.getIsCollection());
- }
-
- // 查询总记录数
- int total = wmMaterialMapper.countByParams(paramMap);
-
- // 查询当前页数据
- List<WmMaterial> records = wmMaterialMapper.findListByParams(paramMap, offset, limit);
-
- // 构建分页响应结果
- ResponseResult responseResult = new PageResponseResult(dto.getPage(), dto.getSize(), total);
- responseResult.setData(records);
- return responseResult;
- }
- <!-- wmMaterialMapper.xml -->
- <select id="countByParams" resultType="int">
- SELECT COUNT(*)
- FROM wm_material
- WHERE user_id = #{userId}
- <if test="isCollection != null">
- AND is_collection = #{isCollection}
- </if>
- </select>
-
- <select id="findListByParams" resultMap="WmMaterialResultMap">
- SELECT *
- FROM wm_material
- WHERE user_id = #{userId}
- <if test="isCollection != null">
- AND is_collection = #{isCollection}
- </if>
- ORDER BY created_time DESC
- LIMIT #{offset}, #{limit}
- </select>
- package com.heima.wemedia.service.impl;
-
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import com.heima.model.common.dtos.ResponseResult;
- import com.heima.model.wemedia.pojos.WmChannel;
- import com.heima.wemedia.mapper.WmChannelMapper;
- import com.heima.wemedia.service.WmChannelService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
-
- @Service
- @Transactional
- @Slf4j
- public class WmChannelServiceImpl extends ServiceImpl<WmChannelMapper, WmChannel> implements WmChannelService {
-
-
- /**
- * 查询所有频道
- * @return
- */
- @Override
- public ResponseResult findAll() {
- return ResponseResult.okResult(list());
- }
- }
业务问题:
删除素材表中的素材时,如果该素材背文章引用,则无法删除。
封面有单图、三图、无图、自动四种模式,自动的话选择文章内容中的图片。
事实上,不需要wm_news_material也可以展示一篇文章,因为wm_news中有对应的图片信息保存在content字段中,之所以加这个表,是为了判断删除素材时不能删除已经有文章用到的素材。
创建controller时,不需要对每一个实体类创建,比如wm_news_material就没有对应的,但需要给wm_news_material创建对应的mapper,因为要和表wm_news_material进行交互。
文章发布思路如下:
提交和修改共用一个函数,首先会将内容保存到数据库,如果是保存则添加,如果是修改,则删除关联库后更新。再判断是否是草稿,如果是草稿则结束。如果不是草稿,则重新添加文章封面和素材的关系、文章内容和素材的关系。
前端发布的数据如下图所示:content部分是一个string类型json,文章和图片类型的列表。
这个功能只需要搞明白业务逻辑即可,没有什么知识点。无非就是集合、stream流、Json.parseArray、BeanUtils.copyProperties(dto, wmNews)、mybatis中xml等的基本用法。
当编写文章后,调用保存wm_news,然后会异步调用审核,审核成功后,会保存wm_article,在wm_article的save功能中,还会异步生成静态页面到minio,并将html的url存到文章表的字段中。
业务逻辑:当自媒体wm_news审核成功后,会保存或修改一条ap_article数据,并且wm_news的article_id字段为新插入ap数据的id。ap_article是用户端的数据。
无论是wm还是ap的都把保存和修改放在一个接口中,只需要判断id是否存在就行。
当vm服务远程调用添加文章到ap_article以及ap_article_content插入数据时,参数可以创建一个dto来extend apArticle,并添加属性content即可。
当wm服务调用ap服务时,步骤如下:
1.创建针对ap服务的feign客户端IArticleClient(interface),改结构有一个功能供ap调用、放在feign服务中
2.在article服务中创建ArticleClient(其实跟controller一样,名字不一样罢了)实现IArticleClient这个接口。
3.在article的service中去实现。
当数据库分库后,id采取自增可能导致id重复,因此分布式id可以采用雪花算法,id不会重复。
这一块记住业务逻辑即可,没什么难点。(详细见md文档)
1.导入依赖和定义远程调用接口(在feign的微服务中,类似于controller,但他是个接口)。这样其他微服务都只需要在调用fein中的接口即可。
- package com.heima.apis.article;
-
- import com.heima.model.article.dtos.ArticleDto;
- import com.heima.model.common.dtos.ResponseResult;
- import org.springframework.cloud.openfeign.FeignClient;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestBody;
-
- @FeignClient("leadnews-article")
- public interface IArticleClient {
-
- @PostMapping("api/v1/article/save")
- public ResponseResult saveArticle(@RequestBody ArticleDto dto);
- }
2. 实现上面定义的接口,这个实现在文章的微服务中实现,而定义feinclient接口是在fein微服务中。
- package com.heima.article.feign;
-
- import com.heima.apis.article.IArticleClient;
- import com.heima.article.service.ApArticleService;
- import com.heima.model.article.dtos.ArticleDto;
- import com.heima.model.common.dtos.ResponseResult;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestBody;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- public class ArticleClient implements IArticleClient {
-
- @Autowired
- private ApArticleService apArticleService;
- @Override
- public ResponseResult saveArticle(@RequestBody ArticleDto dto) {
- return apArticleService.saveArticle(dto);
- }
- }
3. 在实现service就行。
需要在wm的application启动类添加@EnableFeignClients(basePackages = "com.heima.apis")。因为扫不到这个包,api的微服务中没有启动类去扫描这个包。
在wmmedia服务中编写。主要业务逻辑明白就行。
输入文章id->查询文章详情->审核图片、文字->更新wm_news中的status->如果失败返回,审核成功,调用之前写好的保存app文章。
详细见md文档
ps:springboot启动类会自动扫描启动类所在包及子包下的bean、controller、等。
当添加降级的实现类在**包下后,需要扫描到改包,可以添加配置类如下:
创建app文章时,要把静态文件上传到app中。也是异步操作。
整体流程如下:
定时2,同步数据流程如下:
schedule服务创建三个功能,分别是添加任务、拉取任务、取消任务,wm创建文章会调用添加任务,之后审核服务会有定时任务拉取任务完成审核。
可以看ppt,了解一下基础知识。三种技术的实现方式、面试题等
需要两个表来保存定时任务,一个任务表,一个任务执行的日志表
taskinfo_logs表有一个字段为version版本号
业务逻辑:当每次添加一个新任务的时候,要同时保存任务表以及日志表,日志表的版本号初始为1,状态也改成初始化状态。然后如果任务的执行时间小于当前时间,放入redis的list中,否则加入到zset中(原因见ppt)。
实现方式:
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
- <!-- redis依赖commons-pool 这个依赖一定要添加 -->
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-pool2</artifactId>
- </dependency>
略
知识点1:使用scan来进行模糊匹配key,这样效率高。
- public Set<String> scan(String patten){
- Set<String> keys = stringRedisTemplate.execute((RedisCallback<Set<String>>) connection -> {
- Set<String> result = new HashSet<>();
- try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder()
- .match(patten).count(10000).build())) {
- while (cursor.hasNext()) {
- result.add(new String(cursor.next()));
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- return result;
- });
- return keys;
- }
知识点2:redis管道。
- public List<Object> lRightPushPipeline(String type,Collection<String> values){
- List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
- public Object doInRedis(RedisConnection connection) throws DataAccessException {
- StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
- //集合转换数组
- String[] strings = values.toArray(new String[values.size()]);
- //直接批量发送
- stringRedisConn.rPush(type, strings);
- return null;
- }
- });
- return results;
- }
-
- public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){
-
- List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
- @Nullable
- @Override
- public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
- StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;
- String[] strings = values.toArray(new String[values.size()]);
- stringRedisConnection.rPush(topic_key,strings);
- stringRedisConnection.zRem(future_key,strings);
- return null;
- }
- });
- return objects;
- }
下面两个图分别是普通的交互和管道交互图。
这里要注意,每次把db中的taskinfo同步到redis之前,要清空redis,防止重复将一个任务放到redis中。
6.9 发布文章
step1.创建feign的三个接口,添加、取消、按照类型和优先级拉取任务。
略。指导topic、brake、分布式、订阅模式和p2p模式、group等概念。
kafka:为什么支持分布式的功能,因为里面是有topic,支持分区的概念。所以topic A可以存在不同的节点上面。就可以支持海量数据和高并发,提升性能和吞吐量
文章上下架是指:App端的文章上下架,自媒体端选择上下架,修改wm_news表数据,然后放入kafka异步来下架app端和修改app端数据。ap_article_config中可以修改。
wm_news表中的enable字段也表示文章上下架。
生产者:在自媒体的商家和下架功能中写入。
- if (wmNews.getArticleId()!=null) {
- Map<String, Object> map = new HashMap<>();
- map.put("articleId",wmNews.getArticleId());
- map.put("enable",dto.getEnable());
- kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC, JSON.toJSONString(map));
- }
消费者:只需要指定订阅主题topic,而group(一个group的许多消费者只能接受一个信息,就是一条信息如果多个消费者都要使用,只能设置这几个消费者不同的group)这种信息保存在配置中。
- package com.heima.article.listener;
-
- import com.alibaba.fastjson.JSON;
- import com.heima.article.service.ApArticleConfigService;
- import com.heima.article.service.ApArticleService;
- import com.heima.common.constants.WmNewsMessageConstants;
- import com.heima.model.article.pojos.ApArticleConfig;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
-
- @Component
- @Slf4j
- public class ArticleIsDownListener {
-
- @Autowired
- private ApArticleConfigService apArticleConfigService;
-
- @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
- public void onMessage(String message) {
- if (!StringUtils.isBlank(message)) {
- Map map = JSON.parseObject(message, Map.class);
- apArticleConfigService.updateByMap(map);
- }
- }
-
- }
文章创建后会审核,审核成功后会创建article,创建article时会创建html然后保存到minio,
可以在把html文件上传到minio中时发送消息、创建索引,因为此时已经有了静态url信息。
搜索无非就是从es中获取数据。
基于用户的,保存在MongoDB
详细见md文档
问题:将热点的文章数据放入redis,热点是由点赞量、浏览量等计算。
因为是定时任务去计算,因此可以使用分布式定时任务框架。
我对分布式调度的理解,是将xxljob部署在一台机器上,然后进入控制界面,创建执行器和任务,每个任务对应一个执行器,一个执行器对应多个任务。然后在我们自己的java代码中,配置这个xxljob部署的信息,包括address、端口、密码、执行器,之后就可以连接到部署的xxljob上,然后编写业务代码,并且选择对应的在xxljob的图形页面创建的job,之后就可以执行了。之后我们修改定时任务的cron、查看日志等都只需要在部署的xxljob上去修改,而不需要修改本身的业务代码。 如果多个服务启动,也有相应的策略,如轮询去执行定时任务,不会出现每一个服务都同时执行任务的情况。具体再看一篇黑马头条的day10吧。
ap_article中有字段点赞、阅读、评论、收藏。
流程如下:将排好序的30条数据放入redis,用户首次load时会从redis读取热点数据,执行readmore或者readnews之后会从mysql获取。
kafkaStream相当于对生产者创建的信息进行处理后交给消费者。
当用户发起点赞行为和阅读行为后,会将该行为包装成消息(mess:包含行为类型、文章id、add的数量,可以为负数)发送给kafka stream,他会定时计算一段时间内的数据并进行聚合,转化为key是articleId,最后发送给article的kafka,article服务从中接受复合的数据(,包含文章id、点赞、阅读、收藏等的变化值,)该服务会重新计算分值并更新数据库。将分值计算后和redis进行比较来替换文章。因此我认为stream kafka就是将不断地数据进行格式处理和其他处理后再交给另一个服务进行业务。
kafka:1. 自媒体端上下架后放入kafka,app端拿数据进行上下架
2. 文章创建后会审核,审核成功加入消息队列,等待同步到es中。
3.将点赞信息传递到kafka stream进行计算
4.将计算后的数据放入kafka,然后供article 服务进行计算分数
redis:1.使用zset和list实现定时任务
2.保存点赞、收藏、关注、浏览的信息
3. app端首页按照分值排序的30个文章信息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。