赞
踩
springCloud提供了RestTemplate,可以发起远程http协议的调用
使用
注入bean
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
restTemplate使用
@RestController @RequestMapping("order") public class OrderController { @Autowired private OrderService orderService; @Autowired private RestTemplate restTemplate; private static final String UserBaseApiURL = "http://localhost:8081/user/"; @GetMapping("{orderId}") public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) { // 根据id查询订单并返回 Order order = orderService.queryOrderById(orderId); // 远程调用查询用户 User user = restTemplate.getForObject(UserBaseApiURL + order.getUserId(), User.class); order.setUser(user); return order; } }
消费者该如何获取服务提供者具体信息?
如果有多个服务提供者,消费者该如何选择?
消费者如何感知服务提供者的健康状态?
创建项目引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
编写application.yml配置文件
server:
port: 10086
spring:
application:
name: eureka-server # 服务名
eureka:
client:
service-url: # 服务地址
defaultZone: http://127.0.0.1:10086/eureka/
在启动类上开启自动装配
@EnableEurekaServer // 开启eureka自动装配
@SpringBootApplication
public class EurekaApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaApplication.class , args);
}
}
引入eureka-client依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
在application.yml中配置eureka地址
spring:
datasource:
url: jdbc:mysql://localhost:3306/cloud_order?useSSL=false
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
application:
name: orderservice # 服务名
eureka:
client:
service-url: # 服务地址
defaultZone: http://127.0.0.1:10086/eureka/
给RestTemplate添加@LoadBalanced注解
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
用服务提供者的服务名称远程调用
@RestController @RequestMapping("order") public class OrderController { @Autowired private OrderService orderService; @Autowired private RestTemplate restTemplate; private static final String UserBaseApiURL = "http://userservice/user/"; @GetMapping("{orderId}") public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) { // 根据id查询订单并返回 Order order = orderService.queryOrderById(orderId); // 远程调用查询用户 User user = restTemplate.getForObject(UserBaseApiURL + order.getUserId(), User.class); order.setUser(user); return order; } }
代码方式,注入IRule实例bean
@Bean
public IRule randomIRule() {
return new RandomRule();
}
2.配置文件方式: 在application.yml文件中添加配置项
userservice:
ribbon:
NFLoadBalancerRuleClassName: com.netfix.loadbalancer.RandomRule
ribbon默认采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长
在application.yml中开启饥饿加载
ribbon:
eager-load:
enabled: true # 开启饥饿加载
clients:
- userservice # 指定对userservice这个服务饥饿加载
引入依赖
<!-- nacos的管理依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- nacos客户端依赖包 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
修改application.yml文件nacos配置
spring:
application:
name: orderservice # 服务名
cloud:
nacos:
server-addr: localhost:8848 # nacos端口号
nacos默认负载均衡策略为服务轮询,一个服务往往会在多地部署多个实例,相同区域内的服务之间相互调用时长消耗更短,因此区域内优先调用比较合理,nacos提供了这样的负载均衡策略,不过区域内的策略为随机策略,当区域内没有可用服务时再访问其他区域的可用服务。
application.yml配置区域
spring:
application:
name: userservice # 服务名
cloud:
nacos:
server-addr: localhost:8848
discovery:
cluster-name: HZ # 集群名称
为Ribbon配置区域优先策略
userservice:
ribbon:
NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule
不同的时期有不同的环境,比如开发时需要开发环境,namespace就可以进行环境隔离,不同环境之间的服务无法调用。
application.yaml 配置namespace
spring:
application:
name: orderservice # 服务名
cloud:
nacos:
server-addr: localhost:8848
discovery:
cluster-name: HZ # 集群名称
namespace: fd3445b3-8e01-446a-91d5-03ab8b5a7205 # 环境id(从nacos控制台查看)
共同点
区别
Nacos支持服务端主动监测提供者状态:临时实例采用心跳模式,非临时实例采用主动健康监测。
spring:
application:
name: orderservice # 服务名
cloud:
nacos:
server-addr: localhost:8848
discovery:
cluster-name: HZ # 集群名称
ephemeral: false # 非临时实例
临时实例心跳不正常会被剔除,非临时实例则不会被剔除。
Nacos支持服务列表变更的消息推送模式,服务列表更新及时。
Nacos集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式;Eureka采用AP模式。
在控制台添加配置文件
添加依赖
<!-- nacos配置管理依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
在bootstrap.yml文件中配置
spring:
application:
name: userservice
profiles:
active: dev # 开发环境
cloud:
nacos:
server-addr: localhost:8848
config:
file-extension: yaml # 文件后缀
使用@Value注解注入
@Value("${pattern.format}")
private String format;
通过@Value方式注入的配置属性需要在类上添加@RefreshScope注解即可实现配置热更新
@Slf4j
@RestController
@RequestMapping("/user")
@RefreshScope // 配置热更新注解
public class UserController {
@Autowired
private UserService userService;
// 配置属性注入
@Value("${pattern.format}")
private String format;
}
通过@ConfigurationProperties方式注入的属性自动热更新
@Component
@ConfigurationProperties(prefix = "pattern")
@Data
public class PatternProperties {
private String format;
}
引入依赖
<!-- fegin依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
启动类添加@EnableFeignClients注解
@MapperScan("cn.itcast.order.mapper")
@SpringBootApplication
@EnableFeignClients
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
编写FeignClient接口
@FeignClient("userservice")
public interface UserClient {
@GetMapping("user/{id}")
User findById(@PathVariable("id") Long id);
}
使用FeignClient中定义的方法代替RestTemplate
@RestController @RequestMapping("order") public class OrderController { @Autowired private OrderService orderService; @Autowired private UserClient userClient; @GetMapping("{orderId}") public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) { // 根据id查询订单并返回 Order order = orderService.queryOrderById(orderId); // 远程调用查询用户 User user = userClient.findById(order.getUserId()); order.setUser(user); return order; } }
方式一是配置文件
feign:
client:
config:
default: # 这里是default就是全局配置,如果是写服务名,则针对某个微服务的配置
logger-level: FULL # 日志级别
方式二是JAVA代码配置类
public class FeignClientConfiguration {
@Bean
public Logger.Level feignLogLevel(){
return Logger.Level.FULL;
}
}
@MapperScan("cn.itcast.order.mapper")
@SpringBootApplication
// 在启动类上添加的配置类属于全局配置
@EnableFeignClients(defaultConfiguration = FeignAutoConfiguration.class)
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
// 如果需要对某一服务进行配置在服务接口上添加即可
@FeignClient(value = "userservice" , configuration = FeignClientConfiguration.class)
public interface UserClient {
@GetMapping("user/{id}")
User findById(@PathVariable("id") Long id);
}
引入依赖
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>
配置连接池
feign:
client:
config:
default: # 这里是default就是全局配置,如果是写服务名,则针对某个微服务的配置
logger-level: FULL # 日志级别
httpclient:
enabled: true # 开启feign对HttpClient的支持
max-connections: 200 # 最大连接数
max-connections-per-route: 50 # 每个路径的最大连接数
创建新的module,引入SpringCloudGateWay的依赖和服务发现依赖
<dependencies>
<!-- nacos服务发现依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 网关gateway依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
</dependencies>
编写路由配置及nacos地址
server: port: 10010 spring: application: name: gateway cloud: nacos: server-addr: localhost:80 # nacos地址 gateway: routes: # 网关路由配置 - id: user-service # 路由id 自定义 uri: lb://userservice # 路由的目标地址 lb是负载均衡 predicates: # 路由断言 - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求 - id: order-service uri: lb://orderservice predicates: # 路由断言 - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求
为某个服务添加过滤器
spring: application: name: gateway cloud: nacos: server-addr: localhost:8848 # nacos地址 gateway: routes: # 网关路由配置 - id: user-service # 路由id 自定义 uri: lb://userservice # 路由的目标地址 lb是负载均衡 predicates: # 路由断言 - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求 - id: order-service uri: lb://orderservice predicates: # 路由断言 - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求 filters: - AddRequestHeader=color, blue # 局部过滤器
添加默认过滤器(全局)
spring: application: name: gateway cloud: nacos: server-addr: localhost:8848 # nacos地址 gateway: routes: # 网关路由配置 - id: user-service # 路由id 自定义 uri: lb://userservice # 路由的目标地址 lb是负载均衡 predicates: # 路由断言 - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求 - id: order-service uri: lb://orderservice predicates: # 路由断言 - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求 default-filters: - AddRequestHeader=color, blue # 全局过滤器
@Order(1) // 过滤器等级 越低优先值越高
@Component
public class AuthorizeFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
HttpHeaders headers = request.getHeaders();
String token = headers.getFirst("token");
if (token != null && token.equals("abc")) {
return chain.filter(exchange);
}
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
}
引入AMQP的Starter依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置RabbitMQ地址
spring:
rabbitmq:
addresses: 192.168.88.101 # 地址名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast
password: 123321
新建测试类
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "hello , Spring amqp!";
rabbitTemplate.convertAndSend(queueName , message);
}
}
配置地址
spring:
rabbitmq:
addresses: 192.168.88.101 # 地址名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast
password: 123321
新建类
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue") // 在启动前要确保该队列存在!
public void listenSimpleQueue(String msg) {
System.out.println("消费者1接收到消息 = " + msg);
}
}
多个模型绑定到一个队列,用一个消息会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
addresses: 192.168.88.101 # 地址名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast
password: 123321
listener:
simple:
prefetch: 1 # 修改消费者提前把握的最大数量
示例:
设置发布50条消息
@Test
public void testSimpleWorkQueue() {
String queueName = "simple.queue";
for (int i = 0; i < 50; i++) {
String message = "hello , Spring amqp! - " + (i + 1);
rabbitTemplate.convertAndSend(queueName , message);
}
}
两个接收者
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者1接收到消息 = " + msg);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到消息 = " + msg);
Thread.sleep(500);
}
}
特点:
编写配置类
@Configuration public class FanoutConfig { // 声明FanoutExchange交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("itcast.fanout"); } // 声明第一个队列 @Bean public Queue queue1() { return new Queue("fanout.queue1"); } // 绑定队列1和交换机 @Bean public Binding bindingQueue1(Queue queue1 , FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue1).to(fanoutExchange); } // 声明第二个队列 @Bean public Queue queue2() { return new Queue("fanout.queue2"); } // 绑定队列2和交换机 @Bean public Binding bindingQueue2(Queue queue2 , FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue2).to(fanoutExchange); } }
编写消费者代码
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到消息 = " + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到消息 = " + msg);
}
特点:
案例
编写接收者,绑定交换机(注解方式)
@RabbitListener(bindings = @QueueBinding( value = @Queue( name = "direct.queue1"), exchange = @Exchange( name = "itcast.direct" , type = ExchangeTypes.DIRECT), key = {"red" , "blue" } )) public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到消息 = " + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue( name = "direct.queue2"), exchange = @Exchange( name = "itcast.direct" , type = ExchangeTypes.DIRECT), key = {"red" , "pink" } )) public void listenDirectQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息 = " + msg); }
编写发布者
@Test
public void testDirectQueue() {
String queueName = "simple.queue";
String message = "hello , Spring amqp!";
// 交换机name
String exchangeName = "itcast.direct";
rabbitTemplate.convertAndSend(exchangeName , "red" , message);
}
特点
示例:
编写接收者
@RabbitListener(bindings = @QueueBinding( value = @Queue( name = "topic.queue1"), exchange = @Exchange( name = "itcast.topic" , type = ExchangeTypes.TOPIC), key = {"china.*" } )) public void listenTopicQueue1(String msg) { System.out.println("消费者1接收到消息 = " + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue( name = "topic.queue2"), exchange = @Exchange( name = "itcast.topic" , type = ExchangeTypes.TOPIC), key = {"china.*" } )) public void listenTopicQueue2(String msg) { System.err.println("消费者2接收到消息 = " + msg); }
编写发送者
@Test
public void testTopicQueue() {
String queueName = "simple.queue";
String message = "hello , Spring amqp!";
// 交换机name
String exchangeName = "itcast.topic";
rabbitTemplate.convertAndSend(exchangeName , "china.news" , message);
}
RabbieMQ是可以传递java对象的,通过MessageConverter实现,但是默认是JDk的序列化,最好为其配置JSON对象转化器,注意发布与接收方必须使用相同的MessageConverter
案例
消息发布方
引入JackSon依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.16.1</version>
</dependency>
注入Bean
@Bean // 注入json消息转化器
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
发布消息
@Test
public void ObjectQueue() {
String queueName = "object.queue";
HashMap<String, String> hashMap = new HashMap<>();
hashMap.put("name" , "小强");
hashMap.put("age" , "18");
rabbitTemplate.convertAndSend(queueName , hashMap);
}
消息接收方
引入JackSon依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.16.1</version>
</dependency>
注入Bean
@Bean // 注入json消息转化器
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
定义队列
@Bean
public Queue ObjectQueue() {
return new Queue("object.queue");
}
接收消息
@RabbitListener(queues = "object.queue")
public void listenObjectQueue2(HashMap<String , Object> msg) {
System.err.println("消费者接收到消息 = " + msg);
}
什么是elasticsearch?
什么是elastic stack (ELK)?
mapping属性
索引库操作
创建索引库
PUT /索引库名
# 创建索引库 PUT /heima { "mappings": { "properties": { "info": { "type": "text", "analyzer": "ik_smart" }, "email" : { "type": "keyword", "index": false }, "name" : { "properties": { "firstName" : { "type" : "keyword", "index" : false }, "lastName" : { "type" : "keyword", "index" : false } } } } } }
查看索引库
GET /索引库名
# 查询索引库
GET /heima
删除索引库
DELETE /索引库名
# 删除索引库
DELETE /黑马
添加字段
PUT /索引库名/_mapping
# 添加新字段
PUT /heima/_mapping
{
"properties": {
"color" : {
"type" : "keyword",
"index" : false
}
}
}
添加文档
模板
POST /索引库名/_doc/文档id
{
"字段1" : "值1",
"字段2" : "值2",
"字段3" : {
"子属性1" : "值3",
"子属性2" : "值4",
}
}
示例
# 添加文档
POST /heima/_doc/1
{
"info" : "尚硅谷,让天下没有学完的技术!",
"email" : "1482939313@qq.com",
"name" : {
"firstName" : "尚",
"lastName" : "硅谷"
}
}
查看文档
模板
GET /索引库名/_doc/文档id
示例
GET /heima/_doc/1
删除文档
模板
DELETE /索引库名/_doc/文档id
示例
DELETE /heima/_doc/1
修改文档
方式一:全量修改
特点
id存在则修改,不存在则创建
模板
PUT /索引库名/_doc/文档id
{
"字段1" : "值1",
"字段2" : "值2",
}
方式二:局部修改
模板
POST /索引库名/_update/文档id
{
"doc" : {
"字段1" : "值1",
"字段2" : "值2",
}
}
引入依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version>
</dependency>
<!-- 注意版本控制 -->
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>
初始化RestHighLevelClient
package cn.itcast.hotel; @SpringBootTest class HotelDemoApplicationTests { private RestHighLevelClient client; @BeforeEach void setUp() { this.client = new RestHighLevelClient(RestClient.builder( HttpHost.create("http://192.168.88.101:9200") )); } @AfterEach void tearDown() throws IOException { this.client.close(); } @Test void contextLoads() { System.out.println(this.client); } }
操作索引库
创建索引库
// 新建索引
@Test
public void addIndex() throws IOException {
// 创建Request对象
CreateIndexRequest request = new CreateIndexRequest("hotel");
// 准备请求参数 RestClientConstant.HOTELTEMPLATE 为新建索引的json结构
request.source(RestClientConstant.HOTELTEMPLATE , XContentType.JSON);
// 发送请求
client.indices().create(request , RequestOptions.DEFAULT);
}
删除和判断索引库
// 删除索引 @Test public void deleteIndex() throws IOException { // 创建Request对象 DeleteIndexRequest request = new DeleteIndexRequest("hotel"); // 发送请求 client.indices().delete(request , RequestOptions.DEFAULT); } // 判断索引库是否存在 @Test public void existsIndex() throws IOException { // 创建Request对象 GetIndexRequest request = new GetIndexRequest("hotel"); // 发送请求 boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); System.out.println( exists ? "索引库存在" : "索引库不存在"); }
操作文档
新增文档
// 新增文档
@Test
public void addDocument() throws IOException {
// 获取信息
Hotel hotel = hotelService.getById(36934L);
HotelDoc hotelDoc = new HotelDoc(hotel);
// 创建Request对象 并设置id
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
// 设置请求
request.source(JSON.toJSONString(hotelDoc) , XContentType.JSON);
// 发送请求
client.index(request , RequestOptions.DEFAULT);
}
查询文档
// 新增文档
@Test
public void findDocument() throws IOException {
// 创建Request对象 并设置id
GetRequest request = new GetRequest("hotel").id("36934");
// 发送请求
GetResponse response = client.get(request, RequestOptions.DEFAULT);
String json = response.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println("hotelDoc = " + hotelDoc);
}
更新文档
// 更新文档
@Test
public void updateDocument() throws IOException {
// 创建Request对象 并设置id
UpdateRequest request = new UpdateRequest("hotel", "36934");
// 设置更新信息 每两个参数为一对 key,value
request.doc(
"city" , "美国"
);
// 发送请求
client.update(request , RequestOptions.DEFAULT);
}
删除文档
// 删除文档
@Test
public void deleteDocument() throws IOException {
// 创建Request对象 并设置id
DeleteRequest req = new DeleteRequest("hotel", "36934");
// 发送请求
client.delete(req , RequestOptions.DEFAULT);
}
批量操作
// 批量操作 @Test public void bulkDocument() throws IOException { // 获取数据 List<Hotel> hotelList = hotelService.list(); // 创建Request对象 BulkRequest bulkRequest = new BulkRequest(); // 写入 hotelList.stream().forEach( item -> { HotelDoc hotelDoc = new HotelDoc(item); bulkRequest.add(new IndexRequest("hotel") .id(item.getId().toString()).source(JSON.toJSONString(hotelDoc) , XContentType.JSON)); } ); // 发送请求 client.bulk(bulkRequest , RequestOptions.DEFAULT); }
基本语法
GET /indexName/_search
{
"query" : {
"查询类型" : {
"查询条件" : "条件值"
}
}
}
查询所有
GET /hotel/_search
{
"query" : {
“match_all” : {
}
}
}
全文检索查询
会对查询text进行分词,查询倒排索引
match和multi_match的区别:
match查询单字段,multi_match根据多个字段查询,参与查询字段越多,查询性能越差
# match查询 GET /hotel/_search { "query": { "match": { "all": "上海如家" } } } # multi_match查询 GET /hotel/_search { "query": { "multi_match": { "query": "上海如家", "fields": ["brand" , "name" , "business"] } } }
精确查询
# term查询 GET /hotel/_search { "query": { "term": { "brand": { "value": "如家" } J } } } # range查询 范围 GET /hotel/_search { "query": { "range": { "price": { "gte": 400, "lte": 500 } } } }
地理查询
geo_bounding_box:查询geo_point值落在某个矩形范围的所有文档
# geo_bounding_box 边界框查询 GET /hotel/_search { "query": { "geo_bounding_box" : { "location" : { "top_left" : { "lat" : 31.1, "lon" : 121.5 }, "bottom_right" : { "lat" : 30.9, "lon" : 121.7 } } } } }
geo_distance:查询到指定中心点小于某个距离值的所有文档
# geo_distance 距离中心点查询
GET /hotel/_search
{
"query": {
"geo_distance" : {
"distance" : "15km",
"location" : "31.21,121.5"
}
}
}
复合查询
function score query 可以修改文档的相关性算分(query socre),根据新得到的算分排序。
function score query定义的三要素
# function sorce GET /hotel/_search { "query": { "function_score": { "query": { "match": { "all": "上海" } }, "functions": [ { "filter": { "term": { "brand": "万豪" } }, "weight": 10 } ], "boost_mode": "multiply" } } }
复合查询Boolean Query是一个或多个查询子句的组合。子查询的组合方式有:
# 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店 # 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店 GET /hotel/_search { "query": { "bool": { "must": [ { "match": { "name": "如家" } } ], "must_not": [ { "range": { "price": { "gt": 400 } } } ], "filter": [ { "geo_distance": { "distance": "10km", "location": { "lat": 31.21, "lon": 121.5 } } } ] } } }
排序
elasticsearch默认根据相关度算分,也可以指定
简单类型
GET /hotel/_search
{
"query": {
"match": {
"all": "北京"
}
},
"sort": [
{
"price": {
"order": "desc"
}
}
]
}
地理坐标
GET /hotel/_search { "query": { "match": { "all": "上海" } }, "sort": [ { "_geo_distance": { "location": "31.21,121.5", "order": "asc", "unit": "km" } } ] }
分页
elasticsearch默认只返回top10的数据
GET /hotel/_search
{
"query": {
"match_all": {}
},
"from": 10, // 分页开始页
"size": 10 // 分页数量
}
高亮
GET /hotel/_search
{
"query": {
"match": {
"all": "如家"
}
},
"highlight": {
"fields": {
"name": {
"require_field_match": "false"
}
}
}
}
@Test public void query() throws IOException { // 创建request对象 SearchRequest searchRequest = new SearchRequest("hotel"); // 构造DSL语句 // searchRequest.source().query(QueryBuilders.matchAllQuery()); searchRequest.source().query(QueryBuilders.matchQuery("all" , "上海")) .sort("price" , SortOrder.ASC) .highlighter(new HighlightBuilder().field("name").requireFieldMatch(false)) .from(0) .size(50); // 发送请求 SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println("查询结果总条数为:" + response.getHits().getTotalHits() + "条"); System.out.println("高亮部分为:" + response.getHits().getHits()[0].getHighlightFields()); List<HotelDoc> hotelDocList = parseRestClintResponse(response, HotelDoc.class); hotelDocList.stream().forEach(System.out :: println); } private <T> List<T> parseRestClintResponse(SearchResponse response , Class<T> clazz) { SearchHits hits = response.getHits(); ArrayList<T> list = new ArrayList<>(); SearchHit[] hitsArray = hits.getHits(); Arrays.stream(hitsArray).forEach( item -> list.add(JSON.parseObject(item.getSourceAsString() , clazz))); return list; }
什么是聚合?
聚合的分类
示例:
DSL实现Bucket聚合
GET /hotel/_search { "query": { "range": { "price": { "lte": 500 } } }, "size" : 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 100, "order": { "_count": "asc" } } } } }
DSL实现Metrics
GET /hotel/_search { "size": 0, "aggs": { "brandAggs": { "terms": { "field": "brand", "size": 30, "order": { "score_aggs.avg": "desc" } }, "aggs": { "score_aggs": { "stats": { "field": "score" } } } } } }
/** * 聚合查询 */ @Test public void queryAggregation() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source().size(0); request.source().aggregation(AggregationBuilders .terms("brandAgg").field("brand").size(20).order(BucketOrder.aggregation("scoreStats.avg" , false)) .subAggregation(AggregationBuilders.stats("scoreStats") .field("score"))); SearchResponse response = client.search(request, RequestOptions.DEFAULT); Aggregations aggregations = response.getAggregations(); Terms brandAgg = aggregations.get("brandAgg"); List<? extends Terms.Bucket> buckets = brandAgg.getBuckets(); buckets.stream().forEach( item -> { System.out.print(item.getKeyAsString()); Aggregations aggregations1 = item.getAggregations(); List<Aggregation> aggregations2 = aggregations1.asList(); Aggregation aggregation = aggregations2.get(0); // 解析出aggregation内的内容 if (aggregation instanceof Stats) { Stats stats = (Stats) aggregation; double avgScore = stats.getAvg(); double minScore = stats.getMin(); double maxScore = stats.getMax(); long count = stats.getCount(); System.out.println("平均分:" + avgScore + ", 最低分:" + minScore + ", 最高分:" + maxScore + ", 总数:" + count); } } ); }
自定义分词器
// 酒店数据索引库 PUT /hotel { "settings": { "analysis": { "analyzer": { "text_anlyzer": { "tokenizer": "ik_max_word", "filter": "py" }, "completion_analyzer": { "tokenizer": "keyword", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "id":{ "type": "keyword" }, "name":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart", "copy_to": "all" }, "address":{ "type": "keyword", "index": false }, "price":{ "type": "integer" }, "score":{ "type": "integer" }, "brand":{ "type": "keyword", "copy_to": "all" }, "city":{ "type": "keyword" }, "starName":{ "type": "keyword" }, "business":{ "type": "keyword", "copy_to": "all" }, "location":{ "type": "geo_point" }, "pic":{ "type": "keyword", "index": false }, "all":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart" }, "suggestion":{ "type": "completion", "analyzer": "completion_analyzer" } } } }
// 自动补全查询
POST /hotel/_search
{
"suggest": {
"suggestions": {
"text": "sd",
"completion": {
"field": "suggestion",
"skip_duplicates": true,
"size": 10
}
}
}
}
/** * 自动补全 */ @Test public void autoComplete() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source() .suggest(new SuggestBuilder().addSuggestion( "suggestions", SuggestBuilders .completionSuggestion("suggestion") .size(10) .prefix("sd") )); SearchResponse response = client.search(request, RequestOptions.DEFAULT); Suggest suggest = response.getSuggest(); CompletionSuggestion suggestion = suggest.getSuggestion("suggestions"); List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions(); options.stream().forEach(item -> { System.out.println(item.getHit()); }); }
数据同步的几种实现方式
利用MQ实现mysql与elasticsearch数据同步
定义常量类
public class MqConstant {
// 交换机
public static final String HOTEL_EXCHANGE_TOPIC = "hotel.exchange.topic";
// update队列
public static final String HOTEL_UPDATE_QUEUE = "hotel.update";
// delete队列
public static final String HOTEL_DELETE_QUEUE = "hotel.delete";
}
在消费者微服务中声明exchange、queue、RoutingKey(rabbitMQ懒加载,只有消费者在监听,不监听不会创建交换机队列等)
@Configuration public class MqConfig { // 交换机 @Bean public TopicExchange topicExchange() { return new TopicExchange(MqConstant.HOTEL_EXCHANGE_TOPIC , true , false); } // 更新队列 @Bean(name = "updateQueue") public Queue updateQueue(){ return new Queue(MqConstant.HOTEL_UPDATE_QUEUE , true); } // 删除队列 @Bean(name = "deleteQueue") public Queue deleteQueue(){ return new Queue(MqConstant.HOTEL_DELETE_QUEUE , true); } // 绑定队列交换机 @Bean public Binding bindingUpdateQueue(@Qualifier("updateQueue") Queue updateQueue , TopicExchange topicExchange){ return BindingBuilder.bind(updateQueue).to(topicExchange).with(MqConstant.HOTEL_UPDATE_QUEUE); } // 绑定队列交换机 @Bean public Binding bindingDeleteQueue(@Qualifier("deleteQueue") Queue deleteQueue , TopicExchange topicExchange){ return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstant.HOTEL_DELETE_QUEUE); } }
生产者发布
@RestController @RequestMapping("hotel") @CrossOrigin("*") public class HotelController { @Autowired private IHotelService hotelService; @Autowired private RabbitTemplate rabbitTemplate; @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_UPDATE_QUEUE , hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_DELETE_QUEUE , id); } }
消费者监听
@Component public class HotelListener { @Autowired private IHotelService hotelService; // 更新监听 @RabbitListener(queues = MqConstant.HOTEL_UPDATE_QUEUE) public void updateListen(Long id) { System.out.println("更新es文档..."); hotelService.updateEs(id); } // 删除监听 @RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE) public void deleteListen(Long id) { System.out.println("删除es文档..."); hotelService.deleteEs(id); } }
nt.HOTEL_DELETE_QUEUE);
}
}
```
生产者发布
@RestController @RequestMapping("hotel") @CrossOrigin("*") public class HotelController { @Autowired private IHotelService hotelService; @Autowired private RabbitTemplate rabbitTemplate; @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_UPDATE_QUEUE , hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_DELETE_QUEUE , id); } }
消费者监听
@Component public class HotelListener { @Autowired private IHotelService hotelService; // 更新监听 @RabbitListener(queues = MqConstant.HOTEL_UPDATE_QUEUE) public void updateListen(Long id) { System.out.println("更新es文档..."); hotelService.updateEs(id); } // 删除监听 @RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE) public void deleteListen(Long id) { System.out.println("删除es文档..."); hotelService.deleteEs(id); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。