嘘~ 正在从服务器偷取页面 . . .

个人项目开发日志


1. MinIO

对于 OSS 存储,最常用的就是阿里云存储,但是我们也可以使用免费的 MinIO 在服务器上搭建自己的 OSS 存储服务。

这里是 Java SpringBoot 中所需要的 MinIO 的依赖:

<dependency>
    <groupId>io.minio</groupId>
    <artifactId>minio</artifactId>
    <version>8.2.1</version>
</dependency>

1.1 配置单个容器

因为中文文档过老的缘故,建议查看英文最新文档。这里我们使用 Docker 搭建单个 MinIO 的容器:

docker run \
  -p 9000:9000 \
  -p 9001:9001 \
  --name minio1 \
  -e "MINIO_ROOT_USER=username" \
  -e "MINIO_ROOT_PASSWORD=password" \
  -v ~/minio/data:/data \
  minio/minio server /data --console-address ":9001"

1.2 SpringBoot 整合 MinIO

SpringBoot yaml 配置如下:

minio:
  endpoint: http://localhost:9000
  accessKey: root
  secretKey: sast_forever_minio
  bucketName: exam

将 yaml 配置的属性绑定到实体类中,将各个名称对应:

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
//绑定 yaml 配置文档的属性
@ConfigurationProperties(prefix = "minio")
public class MinioYamlConfig {
    private String endpoint;

    private String accessKey;

    private String secretKey;

    private String bucketName;
}

配置好之后,我们就可以自动创建 MinIO 客户端的连接:

import io.minio.MinioClient;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration
// 指定组件生效,并表明所在类
@EnableConfigurationProperties(MinioYamlConfig.class)
public class MinioConfig {
    
    // 使用组件扫描注入,不需要再配置
    @Resource
    private MinioYamlConfig minioYamlConfig;

    // 创建 minio 客户端连接
    @Bean
    public MinioClient minioClient() {
        return MinioClient.builder()
                .endpoint(minioYamlConfig.getEndpoint())
                .credentials(minioYamlConfig.getAccessKey(), minioYamlConfig.getSecretKey())
                .build();
    }
    
}

之后我们确认 MinIO 的工具类:

import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.sast.jwt.exception.LocalRuntimeException;
import io.minio.*;
import io.minio.errors.*;
import io.minio.http.Method;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.http.fileupload.IOUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Slf4j
@Component
public class MinioUtil {

    private final MinioClient minioClient;

    public MinioUtil(MinioClient minioClient) {
        this.minioClient = minioClient;
    }

    //判断是否存在对应的桶,如果没有就创建一个
    public void existBucket(String name) {
        try {
            boolean exists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(name).build());
            if (!exists) {
                minioClient.makeBucket(MakeBucketArgs.builder().bucket(name).build());
            }
        } catch (Exception e) {
            throw new LocalRuntimeException("创建桶失败");
        }
    }

    //上传文件,返回文件名(文件为数组的形式)
    public List<String> upload(MultipartFile[] multipartFile, String bucketName) {
        List<String> names = new ArrayList<>(multipartFile.length);
        for (MultipartFile file : multipartFile) {
            String fileName = file.getOriginalFilename();
            //使用 hutools 的判断,文件名为空或者 null 都会抛出异常
            if (StringUtils.isEmpty(fileName)) {
                throw new LocalRuntimeException("文件名为空");
            }
            String[] split = fileName.split("\\.");
            if (split.length > 1) {
                fileName = System.currentTimeMillis() + "_" + split[0] + "." + split[split.length - 1];
            } else {
                fileName = System.currentTimeMillis() + fileName;
            }
            InputStream in = null;
            try {
                in = file.getInputStream();
                minioClient.putObject(PutObjectArgs.builder()
                        .bucket(bucketName)
                        .object(fileName)
                        //在 partSize 填入-1时表示大小不确定
                        .stream(in, in.available(), -1)
                        .contentType(file.getContentType())
                        .build()
                );
            } catch (Exception e) {
                log.error(e.getMessage());
            } finally {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        log.error(e.getMessage());
                    }
                }
            }
            names.add(fileName);
        }
        return names;
    }


    public ResponseEntity<byte[]> download(String fileName, String bucketName) {
        ResponseEntity<byte[]> responseEntity = null;
        InputStream in = null;
        ByteArrayOutputStream out = null;
        try {
            in = minioClient.getObject(GetObjectArgs.builder()
                    .bucket(bucketName).object(fileName).build());
            out = new ByteArrayOutputStream();
            IOUtils.copy(in, out);
            //封装返回值
            byte[] bytes = out.toByteArray();
            HttpHeaders headers = new HttpHeaders();
            try {
                headers.add("Content-Disposition",
                        "attachment;filename=" + URLEncoder.encode(fileName, Constants.UTF_8));
            } catch (UnsupportedEncodingException e) {
                log.error(e.getMessage());
            }
            headers.setContentLength(bytes.length);
            headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
            headers.setAccessControlExposeHeaders(Collections.singletonList("*"));
            responseEntity = new ResponseEntity<>(bytes, headers, HttpStatus.OK);
        } catch (Exception e) {
            log.error(e.getMessage());
        } finally {
            try {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (out != null) {
                    out.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return responseEntity;
    }

    //删除对应桶的对应文件
    public void removeObject(String bucketName, String objectName) {
        try {
            RemoveObjectArgs objectArgs = RemoveObjectArgs.builder().object(objectName)
                    .bucket(bucketName).build();
            minioClient.removeObject(objectArgs);
        } catch (Exception e) {
            throw new LocalRuntimeException("文件删除失败");
        }
    }

    //获取文件外链
    public String getObjectURL(String bucketName, String objectName, Integer expires) {
        try {
            GetPresignedObjectUrlArgs objectArgs = GetPresignedObjectUrlArgs.builder().object(objectName)
                    .bucket(bucketName)
                    .expiry(expires).build();
            String url = minioClient.getPresignedObjectUrl(objectArgs);
            return URLDecoder.decode(url, "UTF-8");
        } catch (Exception e) {
            log.error("文件路径获取失败" + e.getMessage());
        }
        return null;
    }

    //获取文件流
    public InputStream getMinioFile(String bucketName,String objectName){
        InputStream inputStream = null;
        try {
            GetObjectArgs objectArgs = GetObjectArgs.builder().object(objectName)
                    .bucket(bucketName).build();
            inputStream = minioClient.getObject(objectArgs);
        } catch (Exception e) {
            log.error("文件获取失败" + e.getMessage());
        }
        return inputStream;
    }

    /**
     * 获取某一个存储对象的下载链接
     *
     * @param bucketName 桶名
     * @param method     方法类型
     * @param objectName 对象名
     * @return url 下载链接
     * @throws ServerException 服务异常
     */
    public String getObjectUrl(String bucketName, Method method, String objectName)
            throws ServerException, InsufficientDataException,
            ErrorResponseException, IOException, NoSuchAlgorithmException,
            InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
        return minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
                .method(method)
                .bucket(bucketName)
                .object(objectName).build());
    }

}

1.3 使用 Compose 配置分布式 MinIO

我们也可以使用官方自带的 docker-compose.yaml 和 nginx.conf 搭建 MinIO 环境:

version: '3.7'

# Settings and configurations that are common for all containers
x-minio-common: &minio-common
  image: quay.io/minio/minio:RELEASE.2022-04-01T03-41-39Z
  command: server --console-address ":9001" http://minio{1...4}/data{1...2}
  expose:
    - "9000"
    - "9001"
  environment:
    MINIO_ROOT_USER: root
    MINIO_ROOT_PASSWORD: sast_forever
    #如果不指定,用户名和密码都是 minioadmin
    #需要注意的是,这里的用户名至少为3位,密码至少为8位
  healthcheck:
    test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
    interval: 30s
    timeout: 20s
    retries: 3

# starts 4 docker containers running minio server instances.
# using nginx reverse proxy, load balancing, you can access
# it through port 9000.
services:
  minio1:
    <<: *minio-common
    hostname: minio1
    volumes:
      - data1-1:/data1
      - data1-2:/data2

  minio2:
    <<: *minio-common
    hostname: minio2
    volumes:
      - data2-1:/data1
      - data2-2:/data2

  minio3:
    <<: *minio-common
    hostname: minio3
    volumes:
      - data3-1:/data1
      - data3-2:/data2

  minio4:
    <<: *minio-common
    hostname: minio4
    volumes:
      - data4-1:/data1
      - data4-2:/data2

  nginx:
    image: nginx:1.19.2-alpine
    hostname: nginx
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    ports:
      - "9000:9000"
      - "9001:9001"
    depends_on:
      - minio1
      - minio2
      - minio3
      - minio4

## By default this config uses default local driver,
## For custom volumes replace with volume driver configuration.
volumes:
  data1-1:
  data1-2:
  data2-1:
  data2-2:
  data3-1:
  data3-2:
  data4-1:
  data4-2:

nginx.conf 如下:

user  nginx;
worker_processes  auto;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

events {
    worker_connections  4096;
}

http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    access_log  /var/log/nginx/access.log  main;
    sendfile        on;
    keepalive_timeout  65;

    # include /etc/nginx/conf.d/*.conf;

    upstream minio {
        server minio1:9000;
        server minio2:9000;
        server minio3:9000;
        server minio4:9000;
    }

    upstream console {
        ip_hash;
        server minio1:9001;
        server minio2:9001;
        server minio3:9001;
        server minio4:9001;
    }

    server {
        listen       9000;
        listen  [::]:9000;
        server_name  localhost;

        # To allow special characters in headers
        ignore_invalid_headers off;
        # Allow any size file to be uploaded.
        # Set to a value such as 1000m; to restrict file size to a specific value
        client_max_body_size 0;
        # To disable buffering
        proxy_buffering off;
        proxy_request_buffering off;

        location / {
            proxy_set_header Host $http_host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;

            proxy_connect_timeout 300;
            # Default is HTTP/1, keepalive is only enabled in HTTP/1.1
            proxy_http_version 1.1;
            proxy_set_header Connection "";
            chunked_transfer_encoding off;

            proxy_pass http://minio;
        }
    }

    server {
        listen       9001;
        listen  [::]:9001;
        server_name  localhost;

        # To allow special characters in headers
        ignore_invalid_headers off;
        # Allow any size file to be uploaded.
        # Set to a value such as 1000m; to restrict file size to a specific value
        client_max_body_size 0;
        # To disable buffering
        proxy_buffering off;
        proxy_request_buffering off;

        location / {
            proxy_set_header Host $http_host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
            proxy_set_header X-NginX-Proxy true;

            # This is necessary to pass the correct IP to be hashed
            real_ip_header X-Real-IP;

            proxy_connect_timeout 300;
            
            # To support websocket
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
            
            chunked_transfer_encoding off;

            proxy_pass http://console;
        }
    }
}

1.4 问题

1.4.1 MinIO 无法启动

在 docker-compose up -d 的时候,MinIO 一直无法正常在后台启动。

在不使用后台命令的情况下发现,原来是密码的设置没有达到至少8个字符(后台启动没法看到进入容器内部的报错)。

1.4.2 新版本 MinIO 冲突问题

点开 MinIO 的pom.xml 查看 okhttp3 声明版本是 4.8.1,所以是 SpringBoot 自带的 okhttp 与 MinIO 中不同产生冲突,我们需要手动覆盖。

在父工程 pom.xml 里修改 properties 的版本号。

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.5</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <youlai.version>2.0.0</youlai.version>
    <!-- 覆盖SpringBoot中okhttp3的旧版本声明,解决MinIO 8.4.x的依赖冲突 -->
    <okhttp3.version>4.8.1 </okhttp3.version>
</properties>

2. RabbitMQ

对于 RabbitMQ 通过 Docker 进行安装,采取最新的镜像。

docker run \
-d --name mrabbitmq \
-p 5672:5672 -p 15672:15672 \
-v rabbitmqData:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=username \
-e RABBITMQ_DEFAULT_PASS=pas \
rabbitmq:latest
#左边为 api 端口,右边为 gui 端口

2.1 解决页面问题

直接启动 RabbitMQ 容器的时候,页面会无法打开。

如果查阅 RabbitMQ 官网,会发现我们缺少一个后台管理的插件,需要在 Docker 容器中执行rabbitmq-plugins enable rabbitmq_management命令执行这个插件。这样就能够使用 ip + 15672 访问后端管理页面。

3. SpringCloud

针对多模块开发,各种功能已经不会在一个项目中进行管理。此时,需要一个完整可行的 RPC 方式,来对各种模块的功能/健康进行统一管理。

  • 单体架构:简单方便,高度耦合,扩展性差,适合小型项目。例如:学生管理系统;

  • 分布式架构:松耦合,扩展性好,但架构复杂,难度大。适合大型互联网项目,例如:京东、淘宝;

  • 微服务:一种良好的分布式架构方案;

    • 优点:拆分粒度更小、服务更独立、耦合度更低;

    • 缺点:架构非常复杂,运维、监控、部署难度提高。

  • SpringCloud是微服务架构的一站式解决方案,集成了各种优秀微服务功能组件。

3.1 RestTemplate

针对外面暴露的接口 api,SpringBoot Web 已经整合 RestTemplate 来调用外部的接口。

// 可以在配置文件中自定义 config
@Configuration
public class RestTemplateConfig {
 
    @Bean
    public RestTemplate restTemplate(ClientHttpRequestFactory factory){
        return new RestTemplate(factory);
    }
 
    @Bean
    public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        // 自定义超时时间
        factory.setConnectTimeout(15000);
        factory.setReadTimeout(5000);
        return factory;
    }
}

// 最简单可以在启动类中直接添加 Bean
@Bean
public RestTemplate restTemplate() {
    return new RestTemplate();
}

RestTemplate 的大部分方法主要针对于 HTTP 请求。

  • delete() 在特定的 URL 上对资源执行 HTTP DELETE 操作;
  • exchange() 在 URL 上执行特定的 HTTP 方法,返回包含对象的 ResponseEntity,这个对象是从响应体中映射得到的;
  • execute() 在 URL 上执行特定的 HTTP 方法,返回一个从响应体映射得到的对象;
  • getForEntity() 发送一个 HTTP GET 请求,返回的 ResponseEntity 包含了响应体所映射成的对象;
  • getForObject() 发送一个 HTTP GET 请求,返回的请求体将映射为一个对象;
  • postForEntity() POST 数据到一个 URL,返回包含一个对象的 ResponseEntity,这个对象是从响应体中映射得到的;
  • postForObject() POST 数据到一个 URL,返回根据响应体匹配形成的对象;
  • headForHeaders() 发送 HTTP HEAD 请求,返回包含特定资源 URL 的 HTTP 头;
  • optionsForAllow() 发送 HTTP OPTIONS 请求,返回对特定 URL 的 Allow 头信息;
  • postForLocation() POST 数据到一个 URL,返回新创建资源的 URL;
  • put() PUT 资源到特定的 URL。
@Autowired
private RestTemplate restTemplate;

public Order queryOrderById(Long orderId) {
    // 1. 查询订单
    Order order = orderMapper.findById(orderId);
    // 2. 利用 RestTemplate 发起 http 请求,查询用户
    // 2.1. url 路径
    String url = "http://localhost:8081/user/" + order.getUserId();
    // 2.2. 发送 http 请求,实现远程调用
    User user = restTemplate.getForObject(url, User.class);
    // 3. 封装 user 到 Order
    order.setUser(user);
    // 4. 返回
    return order;
}

orderService 调用了 userService 的方法

3.2 Eureka

  • order-service 在发起远程调用的时候,该如何得知 user-service 实例的 ip 地址和端口?
  • 有多个 user-service 实例地址,order-service 调用时该如何选择?
  • order-service 如何得知某个 user-service 实例是否依然健康,是不是已经宕机?、

针对这些问题要利用 SpringCloud 中的注册中心来解决,这里讲解 Eureka。

image-20220729230223772

3.2.1 Server

引入以下依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

在启动类中添加 @EnableEurekaServer 注解,开启 eureka 的注册中心功能。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class EurekaApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaApplication.class, args);
    }
}

编写一个 application.yml 文件,内容如下:

server:
  port: 10086
spring:
  application:
    name: eureka-server
eureka:
  client:
    service-url: 
      defaultZone: http://127.0.0.1:10086/eureka
      # 启动时服务端地址,后台管理

3.2.2 Consumer

在 user-service 中引入 eureka-Client 的依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

在 user-service 中,修改 application.yml 文件,添加服务名称、eureka 地址:

spring:
  application:
    name: userservice
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:10086/eureka

通过 idea 配置,可以启动相同配置的多个实例,实现模拟集群的效果。

在服务中复制一份相同的配置启动项,除此之外,还需要更改运行端口,防止冲突。

复制配置

在 JVM 的配置中,可以对 yaml 文件的参数进行手动配置。

新版 idea 需要提前开启 VM 选项设置,在其中配置新的端口

在 orderService 总采取同样的配置,修改 url 内容:

@Autowired
private RestTemplate restTemplate;

public Order queryOrderById(Long orderId) {
    // 1. 查询订单
    Order order = orderMapper.findById(orderId);
    // 2. 利用 RestTemplate 发起 http 请求,查询用户
    // 2.1. url 路径
    // String url = "http://localhost:8081/user/" + order.getUserId();
    // 在 userService 中封装了 ip + port
    String url = "http://userservice/user/" + order.getUserId();
    // 2.2. 发送 http 请求,实现远程调用
    User user = restTemplate.getForObject(url, User.class);
    // 3. 封装 user 到 Order
    order.setUser(user);
    // 4. 返回
    return order;
}

在 RestTemplate 中添加 @LoadBalanced 开启自动负载均衡模式(默认为轮询调用)。

成功注册结果显示

3.3 Ribbon

在 SpringCloud 中,添加 @LoadBalanced 注解,就能够自动帮助实现负载均衡。原因是 SpringCould 底层内置了 Ribbon。除此,Ribbon 将配置中的服务名自动转发到对应 IP + port。

负载均衡底层

3.3.1 源码分析

在 LoadBalancerInterceptor 中,Ribbon 帮我们拦截发起的请求,并在其中进行解析。

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

	private LoadBalancerClient loadBalancer;

	private LoadBalancerRequestFactory requestFactory;

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
			LoadBalancerRequestFactory requestFactory) {
		this.loadBalancer = loadBalancer;
		this.requestFactory = requestFactory;
	}

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		// for backwards compatibility
		this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
	}

	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null,
				"Request URI does not contain a valid hostname: " + originalUri);
		return this.loadBalancer.execute(serviceName,
				this.requestFactory.createRequest(request, body, execution));
	}
}
  • request.getURI():获取请求 uri;
  • originalUri.getHost():获取 uri 路径的主机名,其实就是服务 id;
  • this.loadBalancer.execute():处理服务 id,和用户请求。

获取到服务对应的 uri

在 execute 调用 RibbonLoadBalancerClient 中的重载方法。

  • getLoadBalancer(serviceId):根据服务 id 获取 ILoadBalancer,而 ILoadBalancer 会拿着服务 id 去 eureka 中获取服务列表并保存;
  • getServer(loadBalancer):利用内置的负载均衡算法,从服务列表中选择一个。

image-20220801131355311

在 Ribbon 中,默认的负载均衡的规则是轮询模式。

RoudRobinRule

3.3.2 修改负载均衡规则

Ribbon 提供了内置负载均衡的规则。

各种规则的继承链

内置负载均衡规则类 规则描述
RoundRobinRule 简单轮询服务列表来选择服务器。它是Ribbon默认的负载均衡规则。
AvailabilityFilteringRule 对以下两种服务器进行忽略:
(1)在默认情况下,这台服务器如果3次连接失败,这台服务器就会被设置为“短路”状态。短路状态将持续30秒,如果再次连接失败,短路的持续时间就会几何级地增加。
(2)并发数过高的服务器。如果一个服务器的并发连接数过高,配置了AvailabilityFilteringRule规则的客户端也会将其忽略。并发连接数的上限,可以由客户端的 <clientName>.<clientConfigNameSpace>.ActiveConnectionsLimit 属性进行配置。
WeightedResponseTimeRule 为每一个服务器赋予一个权重值。服务器响应时间越长,这个服务器的权重就越小。这个规则会随机选择服务器,这个权重值会影响服务器的选择
ZoneAvoidanceRule 以区域可用的服务器为基础进行服务器的选择。使用Zone对服务器进行分类,这个Zone可以理解为一个机房、一个机架等。而后再对Zone内的多个服务做轮询
BestAvailableRule 忽略那些短路的服务器,并选择并发数较低的服务器
RandomRule 随机选择一个可用的服务器
RetryRule 重试机制的选择逻辑
  1. 在配置类中,重新定义 Rule 的 Bean;

    @Bean
    public IRule myRule(){
        return new RandomRule();
    }
  2. 在 yaml 配置文件中修改定义。

    userservice: # 给某个微服务配置负载均衡规则,这里是 userservice 服务
      ribbon:
        NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule # 负载均衡规则 

总结 Ribbon 拦截解析

3.3.3 饥饿加载

Ribbon 默认是采用懒加载,即第一次访问时才会去创建 LoadBalanceClient,请求时间会很长。

而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,通过下面配置开启饥饿加载:

ribbon:
  eager-load:
    enabled: true
    clients: userservice

3.4 Nacos

SpringCloudAlibaba 推出了 Nacos 的注册中心

在父工程的 pom 文件中的 <dependencyManagement> 中引入 SpringCloudAlibaba 的依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
    <version>2.2.6.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

在子模块中引入 nacos-discovery 依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

在 yaml 文件中配置 nacos 服务地址,就可以正常使用:

spring:
  cloud:
    nacos:
      server-addr: localhost:8848

oderservice 1个 实例,userservice 2个 实例

3.4.1 服务分级存储模型

Nacos 就将同一机房内的实例 划分为一个集群

image-20220802132152943

微服务互相访问时,应该尽可能访问同集群实例,因为本地访问速度更快。当本集群内不可用时,才访问其它集群。

image-20220802132158696

在 yaml 文件中配置实例所属集群:

spring:
  cloud:
    nacos:
      server-addr: localhost:8848
      discovery:
        cluster-name: HZ # 集群名称

重新复制一份配置,更改集群参数:

cluster-name 现在是 SH

两个不同集群

3.4.2 集群负载均衡&权重比

默认的 ZoneAvoidanceRule 并不能实现根据同集群优先来实现负载均衡。

在 orderservice 中还是需要通过 Ribbon 更改负载均衡规则:

userservice:
  ribbon:
    NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule # 负载均衡规则 

而对于服务访问的权重,可以直接在控制台的进行修改(0 表示不会被访问)。

对于多环境管理,nacos 使用 namespace 将服务放置在不同的环境中。

  • nacos 中可以有多个 namespace;
  • namespace下可以有 group、service 等;
  • 不同 namespace 之间相互隔离,例如不同 namespace 的服务互相不可见。

关系图

通过对应配置完成 namespace 归属:

spring:
  cloud:
    nacos:
      server-addr: localhost:8848
      discovery:
        cluster-name: HZ
        namespace: 492a7d5d-237b-46a1-a99a-fa8e98e4b0f9 # 命名空间,填ID

创建时分配唯一的 id

3.4.3 eureka 和 nacos

Nacos的服务实例分为两种l类型:

  • 临时实例:如果实例宕机超过一定时间,会从服务列表剔除,默认的类型。

  • 非临时实例:如果实例宕机,不会从服务列表剔除,也可以叫永久实例。

配置一个服务实例为永久实例:

spring:
  cloud:
    nacos:
      discovery:
        ephemeral: false # 设置为非临时实例
  • Nacos与eureka的共同点

    • 都支持服务注册和服务拉取
    • 都支持服务提供者心跳方式做健康检测
  • Nacos与Eureka的区别

    • Nacos支持服务端主动检测提供者状态:临时实例采用心跳模式,非临时实例采用主动检测模式
    • 临时实例心跳不正常会被剔除,非临时实例则不会被剔除
    • Nacos支持服务列表变更的消息推送模式,服务列表更新更及时
    • Nacos 集群默认采用 AP 方式,当集群中存在非临时实例时,采用 CP 模式;Eureka 采用 AP 方式

nacos 处理请求方式

CP:我们服务可以不能用,但必须要保证数据的一致性。

AP: 数据可以短暂不一致,但最终是需要一致的,无论如何都要保证服务的可用。

只取舍:有在 CP 和 AP 选择一个平衡点,大多数都是选择 AP 模式。

3.4.4 nacos 配置管理

在 nacos 后台中配置对应文件,应用至对应环境中,可以参考 maven 的多环境配置。

ID 跟 Spring 中的命名一致

在 SpringBoot 中需要引入 bootstrap.yaml 文件,读取 nacos 中的配置,会在 application.yaml 之前被读取。

加载流程

引入 nacos-config 依赖:

<!--nacos配置管理依赖-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

添加 bootstrap.yaml:

spring:
  application:
    name: orderservice # 服务名称
  profiles:
    active: dev #开发环境,这里是dev 
  cloud:
    nacos:
      server-addr: localhost:8848 # Nacos地址
      config:
        file-extension: yaml # 文件后缀名

会根据 spring.cloud.nacos.server-addr 获取 nacos 地址,再根据${spring.application.name}-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}作为文件id,来读取配置。

在 SpringBoot 中,可以像操控本地配置文件一样,使用 @Value 进行读取。

@Value("${pattern.dateformat}")
private String dateformat;

@GetMapping("{orderId}")
public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
    // 根据id查询订单并返回
    return orderService.queryOrderById(orderId);
}
配置热更新
  • 在对应 Controller 中添加 @RefreshScope

  • 通过 Spring 中的 @ConfigurationProperties 将配置内容注入 Bean 类中;

    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    @Component
    @Data
    @ConfigurationProperties(prefix = "pattern")
    public class PatternProperties {
        private String dateformat;
    }

    两种方式选择一种即可。

配置共享

微服务启动时,nacos 会读取多个配置文件,例如:

  • [spring.application.name]-[spring.profiles.active].yaml,例如:userservice-dev.yaml;

  • [spring.application.name].yaml,例如:userservice.yaml。

[spring.application.name].yaml 不包含环境,因此可以被多个环境共享。

跟 Spring 一致,如果不加 dev 等 profile,代表共享配置。

共享配置优先级

3.4.5 搭建 nacos

Windows

阿里的仓库找到 nacos,下载对应的 nacos 文件。

对于单节点启动,不需要进行配置,在 bin 目录中使用 .\startup.cmd -m standalone (默认是集群模式)即可启动。

对于集群模式,需要我们提前配置集群的 ip + 端口

127.0.0.1:8841
127.0.0.1:8844
127.0.0.1:8847

对于集群的端口,需要提前预留,在 nacos 连续占用会出现问题。

配置文件中打开 mysql 的连接

复制三份 nacos 文件,需要修改 properties 中的 port

# 配置对应负载均衡
upstream nacos-cluster {
    server 127.0.0.1:8845;
    server 127.0.0.1:8846;
    server 127.0.0.1:8847;
}

#gzip  on;
server {
    listen       10000;
    server_name  localhost;

    location /nacos {
        proxy_pass http://nacos-cluster;
    }
}
Docker

官方 Docker Hub 中有对 Docker 启动的样例文件,但可惜有部分错误。

version: "3"
services:
  nacos1:
    hostname: nacos1
    container_name: nacos1
    image: nacos/nacos-server:${NACOS_VERSION}
    volumes:
      - ./cluster-logs/nacos1:/home/nacos/logs
      - ./init.d/custom.properties:/home/nacos/init.d/custom.properties
    ports:
      - "8848:8848"
      - "9848:9848"
      - "9555:9555"
    env_file:
      - ../env/nacos-hostname.env
    restart: always
    depends_on:
      - mysql

  nacos2:
    hostname: nacos2
    image: nacos/nacos-server:${NACOS_VERSION}
    container_name: nacos2
    volumes:
      - ./cluster-logs/nacos2:/home/nacos/logs
      - ./init.d/custom.properties:/home/nacos/init.d/custom.properties
    ports:
      - "8849:8848"
      - "9849:9848"
    env_file:
      - ../env/nacos-hostname.env
    restart: always
    depends_on:
      - mysql

  nacos3:
    hostname: nacos3
    image: nacos/nacos-server:${NACOS_VERSION}
    container_name: nacos3
    volumes:
      - ./cluster-logs/nacos3:/home/nacos/logs
      - ./init.d/custom.properties:/home/nacos/init.d/custom.properties
    ports:
      - "8850:8848"
      - "9850:9848"
    env_file:
      - ../env/nacos-hostname.env
    restart: always
    depends_on:
      - mysql
      
  mysql:
    container_name: mysql
    # 这里官方默认是 5 版本,但不兼容 nacos 2.1
    image: nacos/nacos-mysql:8.0.16
    env_file:
      - ../env/mysql.env
    volumes:
      - ./mysql:/var/lib/mysql
    ports:
      - "10002:3306"

MySQL 的 env 如下,可以对参数进行修改:

MYSQL_ROOT_PASSWORD=root
MYSQL_DATABASE=nacos_devtest
MYSQL_USER=nacos
MYSQL_PASSWORD=nacos

nacos 的 env 如下:

#nacos dev env
PREFER_HOST_MODE=hostname
NACOS_SERVERS=nacos1:8848 nacos2:8848 nacos3:8848
MYSQL_SERVICE_HOST=mysql
MYSQL_SERVICE_DB_NAME=nacos_devtest
MYSQL_SERVICE_PORT=3306
MYSQL_SERVICE_USER=nacos
MYSQL_SERVICE_PASSWORD=nacos

在 docker-compose 启动的过程中,需要先保证创建出 MySQL 容器。如果第一次失败可以重启,保证数据库成功绑定。

而对于这种 cluster 集群来说,会出现无法配置参数的问题,这是因为两个表缺失了两个字段,补齐之后就可以正常使用。

3.5 Feign

Feign 是一个声明式的 http 客户端,用于替代 RestTemplate 直接使用 URL 的形式。

这个客户端主要是基于 SpringMVC 的注解来声明远程调用的信息,比如:

  • 服务名称:userservice
  • 请求方式:GET
  • 请求路径:/user/{id}
  • 请求参数:Long id
  • 返回值类型:User

这样,Feign 就可以帮助我们发送 http 请求,无需自己使用 RestTemplate 来发送了。

在 orderservice 服务的pom文件中引入 feign 的依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

编写 RPC 接口程序:

import cn.itcast.order.pojo.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient("userservice")
public interface UserClient {
    @GetMapping("/user/{id}")
    User findById(@PathVariable("id") Long id);
}

在 orderservice 中,用 feign 替代 RestTemplate:

@Autowired
private UserClient userClient;

public Order queryOrderById(Long orderId) {
    // 1. 查询订单
    Order order = orderMapper.findById(orderId);
    // 2. 用 Feign 远程调用
    User user = userClient.findById(order.getUserId());
    // 3. 封装 user 到 Order
    order.setUser(user);
    // 4. 返回
    return order;
}

3.5.1 自定义配置

Feign可以支持很多的自定义配置,如下表所示:

类型 作用 说明
feign.Logger.Level 修改日志级别 包含四种不同的级别:NONE、BASIC、HEADERS、FULL
feign.codec.Decoder 响应结果的解析器 http 远程调用的结果做解析,例如解析 json 字符串为 Java 对象
feign.codec.Encoder 请求参数编码 将请求参数编码,便于通过http请求发送
feign.Contract 支持的注解格式 默认是 SpringMVC 的注解
feign.Retryer 失败重试机制 请求失败的重试机制,默认是没有,不过会使用 Ribbon 的重试

这些配置一般采用默认,自定义可以像之前,采用 yaml 或者 Java 代码 两种配置形式。

yaml

基于配置文件修改 feign 的日志级别可以针对单个服务:

feign:  
  client:
    config: 
      userservice: # 针对某个微服务的配置
        loggerLevel: FULL #  日志级别 

也可以针对所有服务:

feign:  
  client:
    config: 
      default: # 这里用default就是全局配置,如果是写服务名称,则是针对某个微服务的配置
        loggerLevel: FULL #  日志级别 

而日志的级别分为四种:

  • NONE:不记录任何日志信息,这是默认值;
  • BASIC:仅记录请求的方法,URL 以及响应状态码和执行时间;
  • HEADERS:在 BASIC 的基础上,额外记录了请求和响应的头信息;
  • FULL:记录所有请求和响应的明细,包括头信息、请求体、元数据。
Java 代码

先声明一个类,然后声明一个Logger.Level的对象:

public class DefaultFeignConfiguration  {
    @Bean
    public Logger.Level feignLogLevel(){
        return Logger.Level.BASIC; // 日志级别为 BASIC
    }
}

如果要全局生效,将其放到启动类的 @EnableFeignClients 这个注解中:

@EnableFeignClients(defaultConfiguration = DefaultFeignConfiguration.class) 

如果是局部生效,则把它放到对应的 @FeignClient 这个注解中:

@FeignClient(value = "userservice", configuration = DefaultFeignConfiguration.class) 

3.5.2 使用优化

Feign 底层发起 http 请求,依赖于其它的框架。其底层客户端实现包括:

•URLConnection:默认实现,不支持连接池;

•Apache HttpClient :支持连接池;

•OKHttp:支持连接池。

因此提高 Feign的性能主要手段就是使用连接池代替默认的 URLConnection。

在 orderservice 的 pom 文件中引入 Apache 的 HttpClient 依赖:

<!--httpClient的依赖 -->
<dependency>
    <groupId>io.github.openfeign</groupId>
    <artifactId>feign-httpclient</artifactId>
</dependency>

2)配置连接池

在 orderservice 的 application.yml 中添加配置:

feign:
  client:
    config:
      default: # default全局的配置
        loggerLevel: BASIC # 日志级别,BASIC就是基本的请求和响应信息
  httpclient:
    enabled: true # 开启feign对HttpClient的支持
    max-connections: 200 # 最大的连接数
    max-connections-per-route: 50 # 每个路径的最大连接数

3.5.3 抽取 api

针对模块中重复的接口调用,可以将共同的 api 提取出额外的接口,之后在进行导入。

继承解决法,无法继承注解映射,不推荐使用

将 UserClient、User、Feign 的默认配置都抽取到一个 feign-api 包中,所有微服务引用该依赖包,即可直接使用。

抽取 api 示例图

在 feign-api 中然后引入 feign 的 starter 依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

将公用的类抽取放入 feign-api 中。

在 orderservice 中引入对应依赖。UserClient 现在在cn.itcast.feign.clients 包下,而 orderservice 的 @EnableFeignClients 注解是在 cn.itcast.order 包下,不在同一个包,无法扫描到 UserClient。

方式一:

指定 Feign 应该扫描的包:

@EnableFeignClients(basePackages = "cn.itcast.feign.clients")

方式二:

指定需要加载的 Client 接口:

@EnableFeignClients(clients = {UserClient.class})

3.6 Gateway

网关的核心功能特性

  • 请求路由;
  • 权限控制;
  • 限流。

Gateway 架构图

在创建 Gateway 模块后,引入以下依赖:

<!--网关-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--nacos服务发现依赖-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
# 配置对应 yaml,将网关匹配到对应 service
server:
  port: 10010
logging:
  level:
    cn.itcast: debug
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  application:
    name: gateway
  cloud:
    nacos:
      server-addr: localhost:8848 # nacos地址
    gateway:
      routes:
        - id: userservice # 路由标示,必须唯一
          uri: lb://userservice # 路由的目标地址
          predicates: # 路由断言,判断请求是否符合规则
            - Path=/user/** # 路径断言,判断路径是否是以 /user 开头,如果是则符合
        - id: orderservice
          uri: lb://orderservice
          predicates:
            - Path=/order/**

整个网关流程图

路由配置包括:

  1. 路由 id:路由的唯一标示

  2. 路由目标(uri):路由的目标地址,http 代表固定地址,lb 代表根据服务名负载均衡

  3. 路由断言(predicates):判断路由的规则,

  4. 路由过滤器(filters):对请求或响应做处理

3.6.1 断言工厂

我们在配置文件中写的断言规则只是字符串,这些字符串会被Predicate Factory读取并处理,转变为路由判断的条件

例如Path=/user/**是按照路径匹配,这个规则是由

org.springframework.cloud.gateway.handler.predicate.PathRoutePredicateFactory类来

处理的,像这样的断言工厂在SpringCloudGateway还有十几个:

名称 说明 示例
After 是某个时间点后的请求 - After=2037-01-20T17:42:47.789-07:00[America/Denver]
Before 是某个时间点之前的请求 - Before=2031-04-13T15:14:47.433+08:00[Asia/Shanghai]
Between 是某两个时间点之前的请求 - Between=2037-01-20T17:42:47.789-07:00[America/Denver], 2037-01-21T17:42:47.789-07:00[America/Denver]
Cookie 请求必须包含某些cookie - Cookie=chocolate, ch.p
Header 请求必须包含某些header - Header=X-Request-Id, \d+
Host 请求必须是访问某个host(域名) - Host=.somehost.org,.anotherhost.org
Method 请求方式必须是指定方式 - Method=GET,POST
Path 请求路径必须符合指定规则 - Path=/red/{segment}, /blue/**
Query 请求参数必须包含指定参数 - Query=name, Jack 或者 - Query=name
RemoteAddr 请求者的ip必须是指定范围 - RemoteAddr=192.168.1.1/24
Weight 权重处理

3.6.2 过滤工厂

Spring 提供了 31种不同的路由过滤器工厂。例如:

名称 说明
AddRequestHeader 给当前请求添加一个请求头
RemoveRequestHeader 移除请求中的一个请求头
AddResponseHeader 给响应结果中添加一个响应头
RemoveResponseHeader 从响应结果中移除有一个响应头
RequestRateLimiter 限制请求的流量

只需要修改 Gateway 服务的 application.yaml 文件,添加路由过滤即可:

spring:
  cloud:
    gateway:
      routes:
      - id: userservice 
        uri: lb://userservice 
        predicates: 
        - Path=/user/** 
        filters: # 过滤器
        - AddRequestHeader=Truth, Itcast is freaking awesome! # 添加请求头

当前过滤器写在 userservice 路由下,因此仅仅对访问 userservice 的请求有效。

如果要对所有的路由都生效,则可以将过滤器工厂写到 default 下。格式如下:

spring:
  cloud:
    gateway:
      routes:
      - id: userservice 
        uri: lb://userservice 
        predicates: 
        - Path=/user/**
      default-filters: # 默认过滤项
      - AddRequestHeader=Truth, Itcast is freaking awesome! 
全局过滤器

全局过滤器的作用也是处理一切进入网关的请求和微服务响应,与 GatewayFilter 的作用一样。区别在于 GatewayFilter 通过配置定义,处理逻辑是固定的;而 GlobalFilter的逻辑需要自己写代码实现。

定义方式是实现 GlobalFilter 接口:

public interface GlobalFilter {
    /**
     *  处理当前请求,有必要的话通过{@link GatewayFilterChain}将请求交给下一个过滤器处理
     *
     * @param exchange 请求上下文,里面可以获取Request、Response等信息
     * @param chain 用来把请求委托给下一个过滤器 
     * @return {@code Mono<Void>} 返回标示当前过滤器业务结束
     */
    Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}

在 filter 中编写自定义逻辑,可以实现下列功能:

  • 登录状态判断;
  • 权限校验;
  • 请求限流等。
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

// @Order(-1)
@Component
public class AuthorizeFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 1. 获取请求参数
        ServerHttpRequest request = exchange.getRequest();
        MultiValueMap<String, String> params = request.getQueryParams();
        // 2. 获取参数中的 authorization 参数
        String auth = params.getFirst("authorization");
        // 3. 判断参数值是否等于 admin
        if ("admin".equals(auth)) {
            // 4.是,放行
            return chain.filter(exchange);
        }
        // 5. 否,拦截
        // 5.1.设 置状态码
        exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
        // 5.2. 拦截请求
        return exchange.getResponse().setComplete();
    }

    @Override
    public int getOrder() {
        return -1;
    }
}

添加对应 parameter 之后才可访问

执行顺序

请求进入网关会碰到三类过滤器:当前路由的过滤器、DefaultFilter、GlobalFilter

请求路由后,会将当前路由过滤器和 DefaultFilter、GlobalFilter,合并到一个过滤器链(集合)中,排序后依次执行每个过滤器:

过滤器执行顺序

  • 每一个过滤器都必须指定一个 int 类型的 order 值,order 值越小,优先级越高,执行顺序越靠前(可以为负数)。
  • GlobalFilter 通过实现 Ordered 接口,或者添加 @Order 注解来指定 order 值,由我们自己指定
  • 路由过滤器和 defaultFilter 的 order 由 Spring 指定,默认是按照声明顺序从 1 递增。
  • 当过滤器的 order 值一样时,会按照 defaultFilter > 路由过滤器 > GlobalFilter 的顺序执行。

3.6.3 跨域问题

在 Gateway 服务的 application.yml 文件中,添加下面的配置:

spring:
  cloud:
    gateway:
      globalcors: # 全局的跨域处理
        add-to-simple-url-handler-mapping: true # 解决 options 请求被拦截问题
        corsConfigurations:
          '[/**]':
            allowedOrigins: # 允许哪些网站的跨域请求 
              - "http://localhost:8090"
            allowedMethods: # 允许的跨域ajax 的请求方式
              - "GET"
              - "POST"
              - "DELETE"
              - "PUT"
              - "OPTIONS"
            allowedHeaders: "*" # 允许在请求中携带的头信息
            allowCredentials: true # 是否允许携带cookie
            maxAge: 360000 # 这次跨域检测的有效期

4. I/O 模型

Java 支持 3种网络编程模型(使用什么样的通道进行数据的发送和接收):BIO、NIO、AIO。

4.1 BIO

Java BIO同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不作任何事情会造成不必要的线程开销。

服务器对于每一个客户端的连接都会分配一个线程处理,为了避免资源消耗过多,采用了线程池模型(创建一个固定大小的线程池,如果有客户端请求,就从线程池中取一个空闲线程来处理,当客户端处理完操作之后,就会释放对线程的占用)。但如果有过多长时间处理的线程,依然会导致没有空余线程。

BIO 底层实现

4.2 NIO

Java NIO同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求会被注册到多路复用器上,多路复用器轮询到有 I/O 请求就会进行处理。

Selector 注册处理各种 I/O 事件,分配一个线程轮询每个客户端是否有事件发生,有事件发生时,顺序处理每个事件,当所有事件处理
完之后,便再转去继续轮询。

NIO 底层实现

在 NIO 中,当一个连接创建后,不需要对应一个线程,连接会被注册到多路复用器上面,一个选择器线程可以同时处理成千上万个连接,系统不必创建大量的线程,也不必维护这些线程,从而大大减小了系统的开销。

4.2.1 Reactor 模式

传统处理模式:

传统 I/O 服务模型

  • 采用阻塞I/O模式获取输入数据;

  • 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。

React 处理模式:

img

Reactor 模式称为反应器模式或应答者模式,拥有一个或多个并发输入源,有一个服务处理器和多个请求处理器,服务处理器会同步的将输入的请求事件以多路复用的方式分发给相应的请求处理器。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有请求,然后将请求派发到相关的工作线程并进行处理的过程。

Reactor 包含三种模式:

  • 单 Reactor 单线程;
  • 单 Reactor 多线程;
  • 主从 Reactor 多线程。

4.2.2 结构

NIO 通过 Channels、Buffers、Selectors 组成。

Channel 和 IO 中的 Stream 近似一个等级的。但 Stream 是单向的,譬如:InputStream, OutputStream。而 Channel 是双向的,既可以用来进行读操作,又可以用来进行写操作。

Channel:FileChannel、DatagramChannel、SocketChannel 和 ServerSocketChannel(对应 IO、UDP、TCP(Server 和 Client))。

Buffer:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer。

Selector 运行单线程处理多个 Channel。要使用 Selector, 得向 Selector 注册 Channel,然后调用 select() 方法。select() 会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件。

Channel

Channel 中,可以进行异步的读写,数据需要读入进一个 Buffer 或者写入进一个 Buffer。

Channel 和 Buffer 关系图

FileChannel

read() 方法返回的int 值表示了有多少字节被读到了Buffer 中。如果返回-1,表示到了文件末尾。

通过调用 position() 方法获取 FileChannel 的当前位置。也可以通过调用 position(long pos) 方法设置 FileChannel 的当前位置。

FileChannel 实例的 size() 方法将返回该实例所关联文件的大小。

可以使用 FileChannel.truncate() 方法截取一个文件,文件将中指定长度后面的部分将被删除。channel.truncate(1024);,截取文件的前1024 个字节。

// 读取数据的例子
public class Test {
    public static void main(String[] args) throws IOException {
        // 获取对应文件
        RandomAccessFile aFile = new RandomAccessFile("D:\\txt\\1.txt", "rw");
        // 文件开启 Channel
        FileChannel inChannel = aFile.getChannel();
        // 创建 buffer 并分配空间
        ByteBuffer buf = ByteBuffer.allocate(48);
        int bytesRead = inChannel.read(buf);
        // 读取到文件末尾,返回-1
        while (bytesRead != -1) {
            System.out.println("Read " + bytesRead);
            // 反转读写模式
            buf.flip();
            while (buf.hasRemaining()) {
                // 读取数据,并输出
                System.out.print((char) buf.get());
            }
            // 清除缓冲区的内容
            buf.clear();
            bytesRead = inChannel.read(buf);
        }
        inChannel.close();
        aFile.close();
        System.out.println();
        System.out.println("End of file");
    }
}

/*==========================================================================*/
// 写入数据例子
public class Test {
    public static void main(String[] args) throws IOException {
        RandomAccessFile aFile = new RandomAccessFile("D:\\txt\\2.txt", "rw");
        FileChannel inChannel = aFile.getChannel();

        String data = "New String to write to file..." + LocalDateTime.now();
        ByteBuffer buf1 = ByteBuffer.allocate(1024);
        buf1.clear();
        buf1.put(data.getBytes());
        buf1.flip();
        // 判断是否有剩余数据
        while (buf1.hasRemaining()) {
            inChannel.write(buf1);
        }
        inChannel.close();
        aFile.close();
        System.out.println();
        System.out.println("End of file");
    }
}

分散(scatter)从 Channel 中读取是指在读操作时将读取的数据写入多个 Buffer 中。因此,Channel 将从 Channel 中读取的数据“分散(scatter)”到多个 Buffer 中。
聚集(gather)写入 Channel 是指在写操作时将多个 Buffer 的数据写入同一个 Channel,因此,Channel 将多个 Buffer 中的数据“聚集(gather)”后发送到 Channel。

scatter 在读取的时候,需要将 buffer 的空间填满,才能读取对应数据,gather 则没有这个限制。

SocketChannel
  • SocketChannel 用来连接 Socket 套接字;
  • SocketChannel 用途用来处理网络 I/O 的通道;
  • SocketChannel 基于 TCP 连接传输;
  • SocketChannel 实现了可选择通道,可以被多路复用。

SocketChannel 是支持非阻塞 socket,改进传统单向流 API,同时支持读写。可以设置 Channel 的(非)阻塞模式,调用 configureBlocking(),通过 isBlocking() 方法判断模式。

public class Test {
    public static final String GREETING = "Hello, World!\n";

    public static void main(String[] args) throws Exception {
        int port = 1234;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());
        // 使用 open 打开 ServerSocketChannel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(port));
        // false 表示非阻塞,true 表示阻塞
        ssc.configureBlocking(false);
        while (true) {
            System.out.println("Waiting for connection...");
            // 使用 accept 接受连接,如果是阻塞模式会在这里进行阻塞
            SocketChannel sc = ssc.accept();
            if (sc == null) {
                System.out.println("No connection");
                Thread.sleep(2000);
            } else {
                System.out.println("Connection established: " + sc.socket().getRemoteSocketAddress());
                // 重置 buffer 的状态
                buffer.rewind();
                sc.write(buffer);
                sc.close();
            }
        }
    }
}
DatagramChannel

DatagramChannel 模拟包导向的无连接协议(如 UDP/IP)。DatagramChannel 是无连接的,每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据负载。与面向流的的 socket 不同,DatagramChannel 可以发送单独的数据报给不同的目的地址。同样,DatagramChannel 对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址)

@DisplayName("发包的 DatagramChannel")
@Test
public void sendDatagram() throws IOException, InterruptedException {
    // 使用 try-catch-resource 的方式,在结束自动执行 close() 方法
    try (DatagramChannel sendChannel = DatagramChannel.open()) {
        InetSocketAddress sendAddress = new InetSocketAddress("127.0.0.1", 9999);
        while (true) {
            sendChannel.send(ByteBuffer.wrap(
                "测试包发送".getBytes(StandardCharsets.UTF_8)),
                             sendAddress
                            );
            log.info("发包端发包");
            Thread.sleep(2000);
        }
    }
}

@DisplayName("收包端 DatagramChannel")
@Test
public void receive() throws IOException {
    try (DatagramChannel receiveChannel = DatagramChannel.open()) {
        InetSocketAddress receiveAddress = new InetSocketAddress(9999);
        // 绑定端口为9999
        receiveChannel.bind(receiveAddress);
        ByteBuffer receiveBuffer = ByteBuffer.allocate(512);
        while (true) {
            receiveBuffer.clear();
            // 可以获取发包端的 ip + 端口等信息
            // 格式:/127.0.0.1:58857
            SocketAddress sendAddress = receiveChannel.receive(receiveBuffer);
            receiveBuffer.flip();
            log.info(sendAddress.toString());
            log.info("收包端收包:{}", StandardCharsets.UTF_8.decode(receiveBuffer));
        }
    }
}

@Test
public void testConnect1() throws IOException {
    try (DatagramChannel connChannel = DatagramChannel.open()) {
        connChannel.bind(new InetSocketAddress(9998));
        connChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
        // 向9999端口发送信息
        connChannel.write(ByteBuffer.wrap("发包".getBytes(StandardCharsets.UTF_8)));
        ByteBuffer readBuffer = ByteBuffer.allocate(512);
        while (true) {
            try {
                readBuffer.clear();
                connChannel.read(readBuffer);
                readBuffer.flip();
                log.info(StandardCharsets.UTF_8.decode(readBuffer).toString());
            } catch (Exception e) {
            }
        }
    }
}
Buffer

在 NIO 库中,所有数据都是用缓冲区处理。

Buffer 继承链

当向 Buffer 写入数据时,Buffer 会记录下写了多少数据。一旦要读取数据,需要通过 flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。

@Test
@DisplayName("IntBuffer 测试")
public void IntBufferTest() {
    // 分配新的int 缓冲区,参数为缓冲区容量
	// 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量。
	// 它将具有一个底层实现数组,其数组偏移量将为零。
    IntBuffer intBuffer = IntBuffer.allocate(10);
    for (int i = 0; i < intBuffer.capacity(); i++) {
        // 将给定整数写入此缓冲区的当前位置,当前位置递增
        intBuffer.put(i * 2 + 1);
    }
    // 重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为0
    intBuffer.flip();
    while (intBuffer.hasRemaining()) {
        log.info("intBuffer.get() = {}", intBuffer.get());
    }
}
基本属性/方法

Buffer 具有三个属性:

  • capacity:Buffer 有一个固定的大小值,有一个容量;

  • position:

    • 写入数据:

      position 表示写入数据的当前位置,position 的初始值为0,position 最大可为capacity – 1。

    • 读取数据:

      position 表示读入数据的当前位置,如 position=2 时表示已开始读入了3个 byte,或从第3个 byte 开始读取。通过ByteBuffer.flip() 切换到读模式时 position 会被重置为0。

  • limit:

    • 写入数据:

      limit 表示可对 Buffer 最多写入多少个数据。写模式下,limit 等于 Buffer 的 capacity。

    • 读取数据:

      limit 表示 Buffer 里有多少可读数据(not null 的数据),因此能读到之前写入的所有数据(limit 被设置成已写数据的数量,这个值在写模式下就是 position,通俗来说,就是你能读取到对应的数据的位置)。

Buffer 方法:

  • rewind():将 position 重置为0,limit 保持不变;
  • clear() & compact():
    • clear():position 重置为0,limit 被设置为 capacity 的值,如果存在未读数据,数据将“被遗忘”,不会被标记出来。Buffer 中的数据并未清除;
    • compact():所有未读的数据拷贝到 Buffer 起始处。将 position 设为最后一个未读元素之后。limit 属性依然设置 capacity。Buffer 写入数据时不会覆盖未读数据;
  • mark() & reset():mark() 可以标记 Buffer 中的一个特定 position,调用Buffer.reset()方法恢复到这个position。
@Test
public void testConnect3() throws IOException {
    ByteBuffer buffer = ByteBuffer.allocate(10);
    // 缓冲区中的数据0-9
    for (int i = 0; i < buffer.capacity(); ++i) {
        buffer.put((byte) i);
    }
    // 创建子缓冲区
    buffer.position(3);
    // 设置最多操作到第七位的数据
    buffer.limit(7);
    // slice 与原来的 buffer 共享数据
    ByteBuffer slice = buffer.slice();
    // 改变子缓冲区的内容
    for (int i = 0; i < slice.capacity(); ++i) {
        byte b = slice.get(i);
        b *= 10;
        slice.put(i, b);
    }

    buffer.position(0);
    buffer.limit(buffer.capacity());
    while (buffer.remaining() > 0) {
        System.out.println(buffer.get());
    }
    /*
    0
    1
    2
    30
    40
    50
    60
    7
    8
    9
    */
}
缓冲区操作

调用buffer.asReadOnlyBuffer();,创建一个只读缓冲区,原缓冲区内容变化时,只读缓冲区也随之变化。

allocateDirect()生成一个直接缓冲区,尝试避免将缓冲区的内容拷贝到一个中间缓冲区中或者从一个中间缓冲区中拷贝数据。

Selector

更少的线程处理通道,相比使用多个线程,避免了线程上下文切换带来的开销。

Selector 功能图示

SelectableChannel 类提供了实现通道的可选择性所需要的公共方法,socket 通道都继承自这个父类,而 FileChannel 类没有。

使用 Channel.register(Selector sel, int ops) 方法,将一个通道注册到一个选择器。

第二个参数表示查询的通道操作,包括以下四种(多种模式可以通过|连接):

  • 可读: SelectionKey.OP_READ;
  • 可写: SelectionKey.OP_WRITE;
  • 连接: SelectionKey.OP_CONNECT;
  • 接收: SelectionKey.OP_ACCEPT。

使用 select() 可以查询出已经就绪的通道操作,就绪的状态集合存放在Set<SelectionKey>集合中。

select() 有几个重载的方法:

  • select():阻塞到至少有一个通道在你注册的事件上就绪了;
  • select(long timeout):和 select() 一样,但最长阻塞事件为 timeout 毫秒;
  • selectNow():非阻塞,只要有通道就绪就立刻返回。

select() 方法返回的 int 值,表示有多少通道已经就绪,更准确的说,是自前一次 select方法以来到这一次 select 方法之间的时间段上,有多少通道变成就绪状态。

@Test
public void ServerDemo() {
    try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
        ssc.socket().bind(new InetSocketAddress("127.0.0.1", 9000));
        // 设置非阻塞,因为 Selector 只能接受非阻塞通道
        // 如果是阻塞通道,那么这里就会抛出 IOException
        // 所以 FileChannel 不能注册到 Selector 上
        ssc.configureBlocking(false);

        Selector selector = Selector.open();
        // 通道不是支持所有的模式,ServerSocketChannel 支持 Accept
        // 查看类内部的 validOps() 方法,获取通道支持的模式
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
        writeBuffer.put("received".getBytes());
        writeBuffer.flip();

        while (true) {
            int nReady = selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();

            while (it.hasNext()) {
                SelectionKey key = it.next();
                if (key.isAcceptable()) {
                    // 创建新的连接,并且把连接注册到 selector
                    // 声明 channel 只对读操作感兴趣
                    SocketChannel socketChannel = ssc.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // 获取 key 对应的 channel
                    try (SocketChannel socketChannel = (SocketChannel) key.channel()) {
                        readBuffer.clear();
                        socketChannel.read(readBuffer);

                        readBuffer.flip();
                        log.info("received: " + StandardCharsets.UTF_8.decode(readBuffer));
                        key.interestOps(SelectionKey.OP_WRITE);
                    }
                } else if (key.isWritable()) {
                    writeBuffer.rewind();
                    try (SocketChannel socketChannel = (SocketChannel) key.channel()) {
                        socketChannel.write(writeBuffer);
                        key.interestOps(SelectionKey.OP_READ);
                    }
                }
                it.remove();
            }
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

@Test
public void ClientDemo() {
    try (SocketChannel socketChannel = SocketChannel.open()) {
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000));

        ByteBuffer writeBuffer = ByteBuffer.allocate(32);
        ByteBuffer readBuffer = ByteBuffer.allocate(32);

        writeBuffer.put("hello server".getBytes());
        writeBuffer.flip();

        while (true) {
            writeBuffer.rewind();
            socketChannel.write(writeBuffer);
            readBuffer.clear();
            socketChannel.read(readBuffer);
        }
    } catch (IOException e) {
    }
}

4.2.3 Pipe

Pipe 用于实现两个线程之间数据的单向流通。

@Test
public void testPipe() throws IOException {
    // 1、获取通道
    Pipe pipe = Pipe.open();
    // 2、获取 sink 管道,用来传送数据
    Pipe.SinkChannel sinkChannel = pipe.sink();
    // 3、申请一定大小的缓冲区
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    byteBuffer.put("hello, world".getBytes());
    byteBuffer.flip();
    // 4、sink 发送数据
    sinkChannel.write(byteBuffer);
    // 5、创建接收 pipe 数据的 source 管道
    Pipe.SourceChannel sourceChannel = pipe.source();
    // 6、接收数据,并保存到缓冲区中
    ByteBuffer byteBuffer2 = ByteBuffer.allocate(1024);
    int length = sourceChannel.read(byteBuffer2);
    System.out.println(new String(byteBuffer2.array(), 0, length));
    sourceChannel.close();
    sinkChannel.close();
}

4.3 AIO(NIO 2)

Java AIO异步非阻塞,AIO 引入了异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

异步 IO 是基于事件和回调机制实现的,AIO 模式不需要 selector 操作,而是事件驱动形式。当客户端发送数据之后,会主动通知服
务器,接着服务器再进行读写操作。

5. Shiro

想要使用 Shiro 中,需要添加以下依赖:

<dependency>
    <groupId>org.apache.shiro</groupId>
    <artifactId>shiro-spring-boot-web-starter</artifactId>
    <version>1.9.0</version>
</dependency>
  1. UsernamePasswordToken,Shiro 用来封装用户登录信息,使用用户的登录信息创建令牌 Token,登录的过程即 Shiro 验证令牌是否具有合法身份以及相关权限;
  2. SecurityManager,Shiro 的核心部分,负责安全认证与授权;
  3. Subject,Shiro 的一个抽象概念,包含了用户信息;
  4. Realm,开发者自定义的模块,根据项目的需求,验证和授权的逻辑在 Realm 中实现;
  5. AuthenticationInfo,用户的角色信息集合,认证时使用;
  6. AuthorizationInfo,角色的权限信息集合,授权时使用;
  7. DefaultWebSecurityManager,安全管理器,开发者自定义的 Realm 需要注入到 DefaultWebSecurityManager 进行管理才能生效;
  8. ShiroFilterFactoryBean,过滤器工厂,Shiro 的基本运行机制是开发者定制规则,Shiro 去执行,具体的执行操作就是由 ShiroFilterFactoryBean 创建一个个 Filter 对象来完成。

Shiro 运行流程

5.1 SpringBoot 整合 JWT

首先配置自己的 Realm,最简单使用账号密码。AuthenticationInfo 作为认证部分,AuthorizationInfo 作为授权部分。

import fun.sast.shirotest.entity.Account;
import fun.sast.shirotest.exception.LocalRuntimeException;
import fun.sast.shirotest.service.AccountService;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.authc.*;
import org.apache.shiro.authz.AuthorizationInfo;
import org.apache.shiro.authz.SimpleAuthorizationInfo;
import org.apache.shiro.realm.AuthorizingRealm;
import org.apache.shiro.subject.PrincipalCollection;
import org.apache.shiro.subject.Subject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashSet;
import java.util.Set;

@Component
@Slf4j
public class UserRealm extends AuthorizingRealm {
    // 因为时候要配置 config,所以这里使用字段注入
    @Autowired
    private AccountService accountService;

    // 授权
    @Override
    protected AuthorizationInfo doGetAuthorizationInfo(
            PrincipalCollection principalCollection) {
        // 获取当前登录对象
        Subject subject = SecurityUtils.getSubject();
        Account account = (Account) subject.getPrincipal();

        // 设置角色
        Set<String> roles = new HashSet<>();
        roles.add(account.getRole().toString());
        SimpleAuthorizationInfo info = new SimpleAuthorizationInfo(roles);

        //设置权限
        // 因为表中并没有权限字段,这里忽略
        // info.addStringPermission(account.getPerms());
        info.addRoles(roles);
        return null;
    }

    // 认证
    @Override
    protected AuthenticationInfo doGetAuthenticationInfo(
            AuthenticationToken authenticationToken) throws AuthenticationException {
        // 客户端传来的 username 和 password 会自动封装到 token
        UsernamePasswordToken token = (UsernamePasswordToken) authenticationToken;
        Account account = accountService.getAccountByUsername(token.getUsername());
        if (account == null) {
            log.error("用户不存在");
            throw new LocalRuntimeException("用户不存在");
        }
        return new SimpleAuthenticationInfo(account, account.getPassword(), getName());
    }
}

认证过滤器:

anon:无需认证即可访问,游客身份。

authc:必须认证(登录)才能访问。

authcBasic:需要通过 httpBasic 认证。

user:不一定已通过认证,只要是曾经被 Shiro 记住过登录状态的用户就可以正常发起请求,比如 rememberMe。

授权过滤器:

perms:必须拥有对某个资源的访问权限(授权)才能访问。

role:必须拥有某个角色权限才能访问。

port:请求的端口必须为指定值才可以访问。

rest:请求必须是 RESTful,method 为 post、get、delete、put。

ssl:必须是安全的 URL 请求,协议为 HTTPS。

import fun.sast.shirotest.common.content.UserRealm;
import org.apache.shiro.spring.web.ShiroFilterFactoryBean;
import org.apache.shiro.web.mgt.DefaultWebSecurityManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class ShiroConfig {
    // 通过 @Qualifier 注入创建好的 Bean
    @Bean
    public ShiroFilterFactoryBean filterFactoryBean(
        @Qualifier("manager") DefaultWebSecurityManager manager){
        ShiroFilterFactoryBean factoryBean = new ShiroFilterFactoryBean();
        factoryBean.setSecurityManager(manager);
        Map<String,String> map = new HashMap<>();
        map.put("/main","authc");
        map.put("/manage","perms[manage]");
        map.put("/administrator","roles[administrator]");
        factoryBean.setFilterChainDefinitionMap(map);
        //设置登录页面
        factoryBean.setLoginUrl("/login");
        //未授权页面
        factoryBean.setUnauthorizedUrl("/unauth");
        return factoryBean;
    }

    @Bean
    public DefaultWebSecurityManager manager(@Qualifier("userRealm") UserRealm userRealm){
        DefaultWebSecurityManager manager = new DefaultWebSecurityManager();
        manager.setRealm(userRealm);
        return manager;
    }

    @Bean
    public UserRealm userRealm(){
        return new UserRealm();
    }
}

Shiro 的问题是不能返回 json,只能跳转对应 url,在前后端分离式开发中比较麻烦。

import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.authc.IncorrectCredentialsException;
import org.apache.shiro.authc.UnknownAccountException;
import org.apache.shiro.authc.UsernamePasswordToken;
import org.apache.shiro.subject.Subject;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.*;

// 因为返回的是 url,所以没有必要添加 @ResponseBody 的注解
@Controller
@Slf4j
@RequestMapping("account")
public class LoginController {

    @GetMapping("/{url}")
    public String redirect(@PathVariable("url") String url) {
        return url;
    }

    @PostMapping("/login")
    public String login(String username, String password, Model model) {
        Subject subject = SecurityUtils.getSubject();
        UsernamePasswordToken token = new UsernamePasswordToken(username, password);
        try {
            subject.login(token);
            return "index";
        } catch (UnknownAccountException e) {
            model.addAttribute("msg", "用户名错误");
            return "login";
        } catch (IncorrectCredentialsException e) {
            model.addAttribute("msg", "密码错误");
            return "login";
        }
    }

    @RequestMapping("/unauth")
    @ResponseBody
    public String unauth() {
        return "未授权没有访问权限";
    }
}

5.2 结合 JWT

  1. POST 用户名与密码到 /login 进行登入,如果成功返回一个加密 token,失败的话直接返回 404 错误;
  2. 之后用户访问每一个需要权限的网址请求需在 header 中添加 Authorization 字段,例如 Authorization: tokentoken 为密钥;
  3. 后台会进行 token 的校验,如果有误会直接返回 404。

5.2.1 实现 JWTToken

JWTTokenShiro 用户名密码的载体,在前后端分离开发中,服务器无需保存用户状态,不需要 RememberMe 这类功能,简单的实现 AuthenticationToken 接口即可。token 中存放用户 uid。

import org.apache.shiro.authc.AuthenticationToken;

public class JWTToken implements AuthenticationToken {
    // 密钥
    private String token;

    public JWTToken(String token) {
        this.token = token;
    }

    @Override
    public Object getPrincipal() {
        return token;
    }

    @Override
    public Object getCredentials() {
        return token;
    }
}

5.2.2 实现 Realm

realm 的用于处理用户是否合法,自定义鉴权中,需要自定义。

import com.sast.jwt.common.enums.CustomError;
import com.sast.jwt.entity.Account;
import com.sast.jwt.exception.LocalRuntimeException;
import com.sast.jwt.service.AccountService;
import com.sast.jwt.utils.JWTUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.authc.AuthenticationException;
import org.apache.shiro.authc.AuthenticationInfo;
import org.apache.shiro.authc.AuthenticationToken;
import org.apache.shiro.authc.SimpleAuthenticationInfo;
import org.apache.shiro.authz.AuthorizationInfo;
import org.apache.shiro.authz.SimpleAuthorizationInfo;
import org.apache.shiro.realm.AuthorizingRealm;
import org.apache.shiro.subject.PrincipalCollection;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyRealm extends AuthorizingRealm {
    private final JWTUtil jwtUtil;

    private final AccountService accountService;

    public MyRealm(AccountService accountService, JWTUtil jwtUtil) {
        this.accountService = accountService;
        this.jwtUtil = jwtUtil;
    }
    
    /*
     * 大坑!,必须重写此方法,不然 Shiro 会报错
     * 这里重写 token 的验证
     */
    @Override
    public boolean supports(AuthenticationToken token) {
        return token instanceof JWTToken;
    }

    // 默认使用此方法进行用户名正确与否验证,错误抛出异常即可。
    @Override
    protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken auth)
            throws AuthenticationException {
        String token = (String) auth.getCredentials();
        Long userId = jwtUtil.getId(token);
        Account account = accountService.getAccountById(userId);
        if (account == null) {
            throw new LocalRuntimeException(CustomError.NO_USER);
        }
        return new SimpleAuthenticationInfo(account.getUsername(), token, "my_realm");
    }
    
    /*
     * 只有当需要检测用户权限的时候才会调用此方法
     * 例如 checkRole,checkPermission 之类的
     */
    @Override
    protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
        Account user = accountService.getAccountByUsername(principals.toString());
        SimpleAuthorizationInfo info = new SimpleAuthorizationInfo();
        info.addRole(user.getRole().toString());
        return info;
    }
}

5.2.3 重写 Filter

所有的请求都会先经过 Filter,需要继承官方的 BasicHttpAuthenticationFilter ,并且重写鉴权的方法。

代码的执行流程 preHandle -> isAccessAllowed -> isLoginAttempt -> executeLogin

import com.sast.jwt.common.contents.JWTToken;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.web.filter.authc.BasicHttpAuthenticationFilter;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMethod;

import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@Component
@Slf4j
// 代码的执行流程 preHandle -> isAccessAllowed -> isLoginAttempt -> executeLogin
public class JWTFilter extends BasicHttpAuthenticationFilter {
    @Override
    protected boolean preHandle(ServletRequest request, ServletResponse response) throws Exception {
        HttpServletRequest httpServletRequest = (HttpServletRequest) request;
        HttpServletResponse httpServletResponse = (HttpServletResponse) response;
        httpServletResponse.setHeader("Access-control-Allow-Origin",
                httpServletRequest.getHeader("Origin"));
        httpServletResponse.setHeader("Access-Control-Allow-Methods",
                "GET,POST,OPTIONS,PUT,DELETE");
        httpServletResponse.setHeader("Access-Control-Allow-Headers",
                httpServletRequest.getHeader("Access-Control-Request-Headers"));
        // 跨域时会首先发送一个 option 请求,这里我们给 option 请求直接返回正常状态
        if (httpServletRequest.getMethod().equals(RequestMethod.OPTIONS.name())) {
            httpServletResponse.setStatus(HttpStatus.OK.value());
            return false;
        }
        return super.preHandle(request, response);
    }

    /*
     * 将非法请求跳转到 /404
     */
    private void response404(ServletRequest req, ServletResponse resp) {
        try {
            HttpServletResponse httpServletResponse = (HttpServletResponse) resp;
            httpServletResponse.sendRedirect("/404");
        } catch (IOException e) {
            log.error(e.getMessage());
        }
    }

    /*
     * 这里我们详细说明下为什么最终返回的都是 true,即允许访问
     * 例如我们提供一个地址 GET /article
     * 登入用户和游客看到的内容是不同的
     * 如果在这里返回了 false,请求会被直接拦截,用户看不到任何东西
     * 所以我们在这里返回 true,Controller 中可以通过 subject.isAuthenticated() 来判断用户是否登入
     * 如果有些资源只有登入用户才能访问,我们只需要在方法上面加上 @RequiresAuthentication 注解即可
     * 但是这样做有一个缺点,就是不能够对 GET,POST 等请求进行分别过滤鉴权
     * (因为我们重写了官方的方法),但实际上对应用影响不大
     */
    @Override
    protected boolean isAccessAllowed(ServletRequest request,
                                      ServletResponse response,
                                      Object mappedValue) {
        if (isLoginAttempt(request, response)) {
            try {
                executeLogin(request, response);
            } catch (Exception e) {
                response404(request, response);
            }
        }
        return true;
    }

    /*
     * 判断用户是否想要登入。
     * 检测 header 里面是否包含 Authorization 字段即可
     */
    @Override
    protected boolean isLoginAttempt(ServletRequest request, ServletResponse response) {
        HttpServletRequest req = (HttpServletRequest) request;
        String authorization = req.getHeader("Authorization");
        return authorization != null;
    }

    @Override
    protected boolean executeLogin(ServletRequest request, ServletResponse response) throws Exception {
        HttpServletRequest httpServletRequest = (HttpServletRequest) request;
        String authorization = httpServletRequest.getHeader("Authorization");

        JWTToken token = new JWTToken(authorization);
        // 提交给 realm 进行登入,如果错误他会抛出异常并被捕获
        // 这一步就是提交给了 realm 进行处理。
        getSubject(request, response).login(token);
        // 如果没有抛出异常则代表登入成功,返回true
        return true;
    }
}

5.2.4 配置 Shiro

import com.sast.jwt.common.contents.MyRealm;
import com.sast.jwt.filter.JWTFilter;
import org.apache.shiro.mgt.DefaultSessionStorageEvaluator;
import org.apache.shiro.mgt.DefaultSubjectDAO;
import org.apache.shiro.spring.LifecycleBeanPostProcessor;
import org.apache.shiro.spring.security.interceptor.AuthorizationAttributeSourceAdvisor;
import org.apache.shiro.spring.web.ShiroFilterFactoryBean;
import org.apache.shiro.web.mgt.DefaultWebSecurityManager;
import org.springframework.aop.framework.autoproxy.DefaultAdvisorAutoProxyCreator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;

import javax.servlet.Filter;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class ShiroConfig {
    @Bean("securityManager")
    public DefaultWebSecurityManager getManager(MyRealm realm) {
        DefaultWebSecurityManager manager = new DefaultWebSecurityManager();
        // 使用自己的 realm
        manager.setRealm(realm);

        /*
         * 关闭 Shiro 自带的 session,详情见文档
         * http://shiro.apache.org/session-management.html#SessionManagement-StatelessApplications%28Sessionless%29
         */
        DefaultSubjectDAO subjectDAO = new DefaultSubjectDAO();
        DefaultSessionStorageEvaluator defaultSessionStorageEvaluator =
                new DefaultSessionStorageEvaluator();
        defaultSessionStorageEvaluator.setSessionStorageEnabled(false);
        subjectDAO.setSessionStorageEvaluator(defaultSessionStorageEvaluator);

        manager.setSubjectDAO(subjectDAO);
        return manager;
    }

    // 如果没有规定 ShiroFilterFactoryBean 会bao
    @Bean("ShiroFilterFactoryBean")
    public ShiroFilterFactoryBean factory(DefaultWebSecurityManager securityManager) {
        ShiroFilterFactoryBean factoryBean = new ShiroFilterFactoryBean();

        // 添加自己的过滤器并且取名为 jwt
        Map<String, Filter> filterMap = new HashMap<>();
        filterMap.put("jwt", new JWTFilter());
        factoryBean.setFilters(filterMap);

        factoryBean.setSecurityManager(securityManager);
        factoryBean.setUnauthorizedUrl("/401");

        /*
         * 自定义url规则
         * http://shiro.apache.org/web.html#urls-
         * 定义对应页面的规则
         */
        Map<String, String> filterChain = new HashMap<>();
        // 所有请求通过我们自己的 JWT Filter
        filterChain.put("/**", "jwt");
        // 登录/注册相关的请求不通过我们的JWT Filter
        filterChain.put("/user/login", "anon");
        filterChain.put("/user/register", "anon");
        // 访问 401和 404 页面不通过我们的 Filter
        // 可惜 Shiro 只能跳转页面,不能返回 json 404 页面
        filterChain.put("/404", "anon");
        filterChain.put("/401", "anon");
        factoryBean.setFilterChainDefinitionMap(filterChain);
        return factoryBean;
    }

    /**
     * 下面的代码是添加注解支持
     */
    @Bean
    @DependsOn("lifecycleBeanPostProcessor")
    public DefaultAdvisorAutoProxyCreator defaultAdvisorAutoProxyCreator() {
        DefaultAdvisorAutoProxyCreator defaultAdvisorAutoProxyCreator = new DefaultAdvisorAutoProxyCreator();
        // 强制使用 cglib,防止重复代理和可能引起代理出错的问题
        // https://zhuanlan.zhihu.com/p/29161098
        defaultAdvisorAutoProxyCreator.setProxyTargetClass(true);
        return defaultAdvisorAutoProxyCreator;
    }

    @Bean
    public LifecycleBeanPostProcessor lifecycleBeanPostProcessor() {
        return new LifecycleBeanPostProcessor();
    }

    @Bean
    public AuthorizationAttributeSourceAdvisor
    authorizationAttributeSourceAdvisor(DefaultWebSecurityManager securityManager) {
        AuthorizationAttributeSourceAdvisor advisor = new AuthorizationAttributeSourceAdvisor();
        advisor.setSecurityManager(securityManager);
        return advisor;
    }
}

6. ElasticSearch

ElasticSearch 是一款非常强大的、基于 Lucene 的开源搜索及分析引擎,是一个实时的分布式搜索分析引擎。基于 Restful WebApi,使用 Java 语言开发。

ES 对于传统 RDBM

6.1 配置安装

配置主要针对于 ver 8,而 ES 在这里添加了新的配置,导致一些原始的配置无法使用。

6.1.1 Windows

ES

在官网下载对应 ES 安装包,解压之后打开 bin 目录下的 ES 的脚本文件,启动完成就可以在 localhost:9200 端口查看。

启动成功示例

如果启动成功却无法访问网页,是因为配置文件中的安全配置默认开启,需设置 xpack.security.enabled: false,此配置会默认开启 SSL 认证。

在修改这个配置之后会导致 ES 的安全性检测问题,使得一些命令被禁止使用,并且会在使用 Kibana 时遇到一些问题,显示证书验证失效。

显示集群安全问题,无法执行重置操作

在启动无问题之后,可以用 elasticsearch-service.bat install 将 ES 添加到 Windows 服务中。

在 ES 第一次启动时,会提供 Password、token 等配置,在后续配置其他管理工具使用,也可以针对集群进行配置。

token 的有效时间为 30 分钟,后续使用需要内部运行程序重新生成

head 插件

对于 ES 节点的管理,可以使用官方的 head 插件。在这之前需要 node 环境,运行 npm install -g grunt-cli 安装 grunt。

在下载好 head 文件之后,进入文件夹 npm install 进行安装。在安装过程中可能会出现 JS core 的版本问题,使用 npm install npm@6.14.13 -g 进行回退。

# 在 ES 配置文件中添加以下配置将 head 添加
http.cors.enabled: true 
http.cors.allow-origin: "*"

安装完成之后 npm run startgrunt server,启动 head 插件(如果显示端口占用,可以在 Gruntfile.js 文件中修改端口)。

Kibana

Kibana 是开源的可视化管理工具,需要和 ES 版本一致。

在 ver 8 之后,官方禁止直接使用 elastic 用户进行连接,推荐 Service Account token(目前还不知道怎么实现)。在本地测试中,可以直接使用 localhost:9200 进行手动配置连接。

显示 elastic 用户权限过高,无法用于连接

# 默认使用语言是 EN
i18n.locale: "zh-CN"

# 如果配置 IP 访问需要配置,默认是本地的 9200 端口
elasticsearch.hosts: ["http://localhost:9200"]

6.2 ES 操作

针对 ES 可以使用之前的 head 插件,也能用更方便的 Kinbana 直接进行可视化管理。因为本身是个 No SQL,可以通过 Postman 调用 URL 进行操作。

使用 Postman 连接本地 ES 操作

在 Kibana 的开发者工具中进行测试

一个索引可以存储超出单个节点硬件限制的大量数据。或者单个节点处理搜索请求,响应太慢。为了解决这个问题,Elasticsearch 提供了将索引划分成多份的能力,每一份就称之为分片(Shards)。当你创建一个索引的时候,你可以指定你想要的分片的数量。每个分片本身也是一个功能完善并且独立的“索引”,这个“索引”可以被放置到集群中的任何节点上。

6.2.1 索引管理

在 ES 中,能搜索的数据必须索引,这样可以加快速度。

PUT /bank
# 创建索引,之后可以在索引里添加数据
# 不能使用 POST,因为 POST 没有幂等性

DELETE /bank
# 删除索引

GET /_cat/indices?v
# 查询目前所有索引

在索引中,可以使用 type 存储不同类型的数据,但是在 ver 7 之后只能使用 _doc 存储数据。

版本 Type
5.x 支持多种 type
6.x 只能有一种 type
7.x 默认不再支持自定义索引类型(默认类型为:_doc)

6.2.2 数据存储

POST /bank/_doc 
{
  "name": "cxy621",
  "age": 18
}
# 每个数据会生成一个唯一 id
# 文档中插入数据只能使用 POST,非幂等性每次会生成不同的 id
# 如果 id 重复,会自动变成修改操作

GET /bank/_doc/fgHXFoIBONKzPt9m8wAN
# 获取对应 id 数据

GET /_cat/indices?v
# 查询所有索引情况

查询结果

# 获取索引下的所有内容
GET /shopping/_search

查询所有数据返回数据

  • took:Elasticsearch运行查询所花费的时间(以毫秒为单位);

  • timed_out:搜索请求是否超时;

  • _shards:搜索了多少个碎片,以及成功,失败或跳过了多少个碎片的细目分类;

  • max_score:找到的最相关文档的分数;

  • hits.total.value:找到了多少个匹配的文档;

  • hits.sort:文档的排序位置(不按相关性得分排序时);

  • hits._score:文档的相关性得分(使用 match_all 时不适用,在条件查询中会根据 score 来判断相关性的大小)。

6.2.3 条件查询

GET /shopping/_search
{
  "query": {"match_all": {}},
  "sort": [
    {
      "account_number": "desc"
    }
  ],
  "from": 3,
  "size": 2
}
# query 表示通过条件查询,match_all 表示全部
# sort 表示通过字段排序
# form, size 用于分页查询

# match 会匹配包含字段的内容
GET /shopping/_search
{
  "query":{
    "match": {
      "city": "南京"
    }
  }
}

# match_phrase 强制匹配字段
GET /shopping/_search
{
  "query": {
    "match_phrase": {
      "city": "南京"
    }
  }
}

ES 的分词器会让 match 匹配出包含字的数据

GET /shopping/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "city": "南京"
          }
        }
      ],
      "must_not": [
        {
          "match": {
            "_id": 4
          }
        }
      ]
    }
  }
}
# 通过 bool 进行复合查询

GET /bank/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "state": "ND"
          }
        }
      ],
      "filter": [
        {
          "term": {
            "age": "40"
          }
        },
        {
          "range": {
            "balance": {
              "gte": 20000,
              "lte": 30000
            }
          }
        }
      ]
    }
  }
}
# must 和 filter 的区别在于前者是会给文档打分,后者是直接过滤
# term 表示对应字段
# range 中 gte 表示大于,lte 表示小于

6.2.4 聚合查询

ES 中的聚合是 agg,表示聚合。

GET /bank/_search
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword"
      }
    }
  }
}
# keyword 表示对关键字的统计
# 每次查询会在 hits 中展示具体信息,size 设置为0表示不需要

GET /bank/_search
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword",
        "order": {
          "average_balance": "desc"
        }
      },
      "aggs": {
        "average_balance": {
          "avg": {
            "field": "balance"
          }
        }
      }
    }
  }
}
# 聚合中的字段进行自命名,可以嵌套使用
# avg 求平均值关键字
# 使用 order + 字段

6.3 整合 SpringBoot

操作 ES 需要添加 Spring Data 依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
spring:  
  elasticsearch:
    host: localhost
    port: 9200

同 MinIO 一致,配置客户端连接,创建 Bean。

import org.elasticsearch.client.RestHighLevelClient;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;

// 配置 ES 连接
@Configuration
public class ESConfig extends AbstractElasticsearchConfiguration {
    @Value("${spring.elasticsearch.host}")
    private String host;

    @Value("${spring.elasticsearch.port}")
    private String port;

    @NotNull
    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {
        final ClientConfiguration clientConfiguration =
                ClientConfiguration.builder()
                .connectedTo(host + ":" + port)
                .build();
        return RestClients.create(clientConfiguration).rest();
    }
}

客户端对象:

  • ElasticsearchOperations 通过偏向 oop 的方式操作;
  • RestHighLevelClient 类似 kibana ,通过 rest 操作(推荐)。

6.3.1 ElasticsearchOperations

创建映射实体类,将索引的中的对应 JSON 映射到对象中。

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
 
@Document(indexName = "products")
// 指定文档的索引名称
public class Product {
    @Id
    // 指定字段作为 _id
    private Integer id;
 
    @Field(type = FieldType.Keyword)
    private String title;
 
    @Field(type = FieldType.Float)
    private Double price;
 
    @Field(type = FieldType.Text,analyzer = "ik_max_word")
    // 指定映射类型和分词器
    private String description;
    //get set...
}

测试,添加返回删除。

@SpringBootTest
class DemoApplicationTests {
 
    @Autowired
    private ElasticsearchOperations elasticsearchOperations;
 
    @Test
    void contextLoads() {
        Product product = new Product();
        product.setId(1);
        product.setTitle("iphone");
        product.setPrice(9999.0);
        product.setDescription("iphone with IOS");
 
        elasticsearchOperations.save(product);
        // save() 当文档 id 不存在时,创建文档
        // 当文档 id 存在时,更新文档
 
        Product res = elasticsearchOperations.get("1", Product.class);
 
        elasticsearchOperations.delete(product);
    }
}

6.3.2 RestHighLevelClient

import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class ESUtil {
    private final RestHighLevelClient restHighLevelClient;

    public ESUtil(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
    }

    /**
     * 判断索引是否存在
     *
     * @param indexName 索引名称
     * @return
     * @throws IOException
     */
    public boolean isIndexExist(String indexName) throws IOException {
        GetIndexRequest request = new GetIndexRequest(indexName);
        return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
    }

    /**
     * 创建索引
     *
     * @param indexName   索引名
     * @param mappingJson 映射
     * @throws IOException
     */
    public void createIndex(String indexName, 
                            String mappingJson) throws IOException {
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
        if (mappingJson != null) {
            createIndexRequest.mapping(mappingJson, XContentType.JSON);
        }
        restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    }

    /**
     * 向指定索引中添加文档
     *
     * @param indexName 索引名
     * @param document  被添加的 JSON 文档
     * @param id        指定要添加的文档的 id,为 null 时 ES 会自动生成
     * @throws IOException
     */
    public void addDocument(String indexName, 
                            String document, String id) throws IOException {
        IndexRequest request = new IndexRequest(indexName);
        request.id(id).source(document, XContentType.JSON);
        restHighLevelClient.index(request, RequestOptions.DEFAULT);
    }

    /**
     * 根据 id 更新文档 (在原文档的基础上更新)
     *
     * @param indexName 索引名
     * @param id        id
     * @param document  更新内容
     */
    public void updateDocument(String indexName, 
                               String id, String document) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(indexName, id);
        updateRequest.doc(document, XContentType.JSON);
        restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
    }

    /**
     * 删除指定文档
     *
     * @param indexName
     * @param id
     */
    public void deleteDocument(String indexName, 
                               String id) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
        restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
    }

    /**
     * 根据 id 返回文档
     *
     * @param indexName
     * @param id
     * @return
     * @throws IOException
     */
    public Map<String, Object> getDocumentById(String indexName, 
                                               String id) throws IOException {
        GetRequest getRequest = new GetRequest(indexName, id);
        GetResponse documentFields = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
        return documentFields.getSource();
    }


    /**
     * 封装分页条件查询
     *
     * @param indexName
     * @param queryBuilder
     * @param pageNum      起始位置(从0开始)
     * @param pageSize     每一页的数量
     * @return
     * @throws IOException
     */
    private SearchResponse query(String indexName,
                                 QueryBuilder queryBuilder,
                                 int pageNum, int pageSize) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName);

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(queryBuilder)
                .from((pageNum - 1) * pageSize)
                .size(pageSize);

        searchRequest.source(searchSourceBuilder);
        return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    }

    /**
     * term 查询
     *
     * @param indexName
     * @param fieldName
     * @param terms
     * @return
     * @throws IOException
     */
    public SearchResponse termQuery(String indexName, 
                                    String fieldName, String... terms) throws IOException {
        return this.query(indexName, QueryBuilders.termsQuery(fieldName, terms), 0, 10);
    }
}

参考文章

  1. MinIO 官方网站
  2. SpringBoot 集成 MinIO 8.3.X 依赖冲突解决
  3. SpringBoot 集成 MinIO
  4. SpringBoot 整合 Minio 对象存储服务
  5. Docker 安装 RabbitMQ 无法访问页面
  6. RabbitMQ 官方网站
  7. 认识一下 RabbitMQ
  8. SpringBoot 2.X 整合 RestTemplate
  9. Nacos 中 AP 和 CP 模式切换
  10. Windows 搭建 Nacos 服务
  11. 启动 nacos 集群出现端口占用问题
  12. Nacos 2.1.0 无法创建新的配置
  13. Nacos Docker 快速开始
  14. Netty 学习前基本知识
  15. Reactor 模式
  16. Redis 为什么这么快?
  17. Spring Boot 整合 Shiro 教程
  18. Shiro + JWT + Spring Boot Restful
  19. Windows 环境下 ES 安装教程
  20. Elasticsearch 8.0 received plaintext http traffic on an https channel, closing connection
  21. 【npm】core-js@2.6.12
  22. Windows 下安装和配置 Kibana
  23. ElasticSearch – 風楪fy

文章作者: 陈鑫扬
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 陈鑫扬 !
评论
  目录