赞
踩
分享不易,希望能收获您的点赞和收藏。
转载请注明出处
引入依赖:
<dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>redshiftdata</artifactId> </dependency>
代码:
- import cn.hutool.core.collection.CollectionUtil;
- import cn.hutool.core.convert.Convert;
- import cn.hutool.core.lang.Assert;
- import cn.hutool.core.util.StrUtil;
- import cn.hutool.extra.spring.SpringUtil;
- import com.gz.common.config.RedshiftProperties;
- import lombok.Builder;
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
- import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
- import software.amazon.awssdk.regions.Region;
- import software.amazon.awssdk.services.redshiftdata.RedshiftDataClient;
- import software.amazon.awssdk.services.redshiftdata.model.*;
-
- import java.util.*;
- import java.util.function.BiFunction;
- import java.util.function.Supplier;
-
- @Slf4j
- @Data
- public class RedshiftUtils {
-
-
- public interface Command {
- String getCommand();
- }
-
- public static class CopyCommand implements Command {
- public static final String COPY_COMMAND = "copy {table} from '{s3Url}' delimiter '{delimiter}' iam_role '{iamRole}' {options};";
- private String table;
- private String s3Url;
- private String delimiter = ",";
- private String iamRole;
- private Map<String, String> options = new HashMap<>();
-
- @Builder
- public CopyCommand(String table, String s3Url, String delimiter, String iamRole) {
- this.table = table;
- this.s3Url = s3Url;
- this.delimiter = delimiter;
- this.iamRole = iamRole;
- }
-
- public Map<String, String> ignoreheaderOpt(Integer num) {
- options.put("ignoreheader", num.toString());
- return options;
- }
-
- public Map<String, String> addOptions(Map<String, String> options) {
- this.options.putAll(options);
- return options;
- }
-
-
- public String getCommand() {
- String command = COPY_COMMAND;
- HashMap<String, String> map = new HashMap<>();
- StringBuilder optionBuilder = new StringBuilder();
- if (CollectionUtil.isNotEmpty(options)) {
- options.forEach((k, v) -> {
- optionBuilder.append(k).append(" ").append(v).append(" ");
- });
- map.put("options", optionBuilder.toString());
- } else {
- command = command.replace("{options}", "");
- }
-
- map.put("table", this.table);
- map.put("s3Url", this.s3Url);
- map.put("delimiter", this.delimiter);
- map.put("iamRole", this.iamRole);
- return StrUtil.format(command, map);
- }
- }
-
- public static class UnLoadCommand implements Command {
- public static final String UNLOAD_COMMAND = "unload ('{sql}') to '{s3Url}' iam_role '{iamRole}' {options};";
- private String sql;
- private String s3Url;
- private String iamRole;
- private Map<String, String> options = new HashMap<>();
-
- @Builder
- public UnLoadCommand(String sql, String s3Url, String iamRole) {
- this.sql = sql;
- this.s3Url = s3Url;
- this.iamRole = iamRole;
- }
-
- public Map<String, String> addOptions(Map<String, String> options) {
- this.options.putAll(options);
- return options;
- }
-
-
- public String getCommand() {
- String command = UNLOAD_COMMAND;
- HashMap<String, String> map = new HashMap<>();
- StringBuilder optionBuilder = new StringBuilder();
- if (CollectionUtil.isNotEmpty(options)) {
- options.forEach((k, v) -> {
- optionBuilder.append(k).append(" ").append(v).append(" ");
- });
- map.put("options", optionBuilder.toString());
- } else {
- command = command.replace("{options}", "");
- }
-
- map.put("sql", this.sql);
- map.put("s3Url", this.s3Url);
- map.put("iamRole", this.iamRole);
-
- return StrUtil.format(command, map);
- }
- }
-
- private static String ACCESS_KEY;
- private static String SECRE_KEY;
- private static String DATABASE;
- private static String SECRE_ARN;
- private static String GROUP_NAME;
- private static Region REGION; // 设置您的区域;
-
- static {
- RedshiftProperties redshiftProperties = SpringUtil.getBean(RedshiftProperties.class);
- ACCESS_KEY = redshiftProperties.getAccessKey();
- SECRE_KEY = redshiftProperties.getSecretkey();
- DATABASE = redshiftProperties.getDatabase();
- SECRE_ARN = redshiftProperties.getArn();
- GROUP_NAME = redshiftProperties.getGroupName();
- REGION = Region.regions().stream().filter(e -> e.id().equals(redshiftProperties.getRegion())).findAny().orElseThrow();
- }
-
-
- public static RedshiftDataClient getReshiftDataClient() {
- return RedshiftDataClient
- .builder()
- .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(ACCESS_KEY, SECRE_KEY)))
- .region(REGION).build();
- }
-
- public static RedshiftDataClient getReshiftDataClient(Region region) {
- return RedshiftDataClient
- .builder()
- .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(ACCESS_KEY, SECRE_KEY)))
- .region(region).build();
- }
-
-
- /**
- * 执行sql语句
- *
- * @param sql
- * @return null or list
- */
- public static <T extends RedshiftDataResponse> T executeStatement(String sql, Class<T> retType) {
- return execQueryPipeline(Collections.singletonList(sql),
- RedshiftUtils::getReshiftDataClient,
- RedshiftUtils::executeQueryStatement,
- RedshiftUtils::executeDescribeStatement,
- RedshiftUtils::executeGetResultStatement,
- retType
- );
- }
-
-
- /**
- * 获取查询结果
- *
- * @param reshiftClient
- * @param excuteResponse
- * @return
- */
- public static GetStatementResultResponse executeGetResultStatement(RedshiftDataClient reshiftClient, ExecuteStatementResponse excuteResponse) {
- GetStatementResultRequest getStatementRequest = GetStatementResultRequest.builder()
- .id(excuteResponse.id())
- .build();
- return reshiftClient.getStatementResult(getStatementRequest);
- }
-
- /**
- * 获取sql语句执行状态和结果
- *
- * @param reshiftClient
- * @param excuteResponse
- * @return
- */
- public static DescribeStatementResponse executeDescribeStatement(RedshiftDataClient reshiftClient, ExecuteStatementResponse excuteResponse) {
- DescribeStatementRequest describeStatementRequest = DescribeStatementRequest.builder()
- .id(excuteResponse.id())
- .build();
- return reshiftClient.describeStatement(describeStatementRequest);
- }
-
- /**
- * 获取sql查询请求的响应
- *
- * @param reshiftClient
- * @param sql
- * @return {@link ExecuteStatementResponse}
- */
- public static ExecuteStatementResponse executeQueryStatement(RedshiftDataClient reshiftClient, List<String> sql) {
- ExecuteStatementRequest statementRequest = ExecuteStatementRequest.builder().secretArn(SECRE_ARN).workgroupName(GROUP_NAME).database(DATABASE).sql(sql.get(0)).build();
- return reshiftClient.executeStatement(statementRequest);
- }
-
-
- /**
- * 查询执行管道
- *
- * @param sql
- * @param clientSupply
- * @param execueQueryFuntion
- * @param executeDescribeFuntion
- * @param executeGetResultFuntion
- * @param expectRetType 可根据expectRetType进行流程执行控制
- * @param <C>
- * @param <T>
- * @param <G>
- * @param <D>
- * @param <R>
- * @return expectRetType
- */
- public static <C extends RedshiftDataClient,
- T extends RedshiftDataResponse,
- G extends GetStatementResultResponse,
- D extends DescribeStatementResponse,
- R extends RedshiftDataResponse> R execQueryPipeline(List<String> sql,
- Supplier<C> clientSupply,
- BiFunction<C, List<String>, T> execueQueryFuntion,
- BiFunction<C, T, D> executeDescribeFuntion,
- BiFunction<C, T, G> executeGetResultFuntion,
- Class<R> expectRetType
-
-
- ) {
- R retTypr = null;
- C client = clientSupply.get();
- try (client) {
- T queryResponse = execueQueryFuntion.apply(client, sql);
- DescribeStatementResponse describeResponse = executeDescribeFuntion.apply(client, queryResponse);
- Assert.isNull(describeResponse.error(), () -> {
- log.error("error detail:{}", describeResponse);
- return RedshiftDataException.builder().message(describeResponse.error()).build();
- });
- if (Objects.nonNull(expectRetType) && expectRetType.isAssignableFrom(DescribeStatementResponse.class)) {
- retTypr = Convert.convert(expectRetType, describeResponse);
- } else if (describeResponse.hasResultSet()) {
- GetStatementResultResponse getResultResponse = executeGetResultFuntion.apply(client, queryResponse);
- if (Objects.nonNull(expectRetType) && expectRetType.isAssignableFrom(GetStatementResultResponse.class)) {
- retTypr = Convert.convert(expectRetType, getResultResponse);
- }
- }
- }
- return retTypr;
- }
-
- /**
- * 执行unload到s3
- *
- * @return
- */
- public static DescribeStatementResponse executeUnload(UnLoadCommand unLoadCommand) {
- return execQueryPipeline(Collections.singletonList(unLoadCommand.getCommand()),
- RedshiftUtils::getReshiftDataClient,
- RedshiftUtils::executeQueryStatement,
- RedshiftUtils::executeDescribeStatement,
- RedshiftUtils::executeGetResultStatement,
- DescribeStatementResponse.class
- );
- }
-
- /**
- * 执行copy
- *
- * @return
- */
- public static DescribeStatementResponse executeCopy(CopyCommand command) {
- return execQueryPipeline(Collections.singletonList(command.getCommand()),
- RedshiftUtils::getReshiftDataClient,
- RedshiftUtils::executeQueryStatement,
- RedshiftUtils::executeDescribeStatement,
- RedshiftUtils::executeGetResultStatement,
- DescribeStatementResponse.class
- );
- }
-
- /**
- * 批处理
- *
- * @param sqls
- * @return
- */
- public static <T extends RedshiftDataResponse> T batchExecuteStatement(List<String> sqls, Class<T> retType) {
-
- return execQueryPipeline(
- sqls,
- RedshiftUtils::getReshiftDataClient,
- (cl, s) -> {
- BatchExecuteStatementRequest request = BatchExecuteStatementRequest.builder().secretArn(SECRE_ARN).workgroupName(GROUP_NAME).database(DATABASE).sqls(sqls).build();
- return cl.batchExecuteStatement(request);
- },
- (cl, re) -> {
- DescribeStatementRequest describeStatementRequest = DescribeStatementRequest.builder()
- .id(re.id())
- .build();
- return cl.describeStatement(describeStatementRequest);
- },
- (cl, re) -> {
- GetStatementResultRequest getStatementRequest = GetStatementResultRequest.builder()
- .id(re.id())
- .build();
- return cl.getStatementResult(getStatementRequest);
- }, retType
- );
- }
-
-
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。