当前位置:   article > 正文

AWS SDK FOR JAVA操作RedShift_software.amazon.awssdk

software.amazon.awssdk


分享不易,希望能收获您的点赞和收藏。

转载请注明出处

引入依赖:

<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>redshiftdata</artifactId>
</dependency>

 代码:

  1. import cn.hutool.core.collection.CollectionUtil;
  2. import cn.hutool.core.convert.Convert;
  3. import cn.hutool.core.lang.Assert;
  4. import cn.hutool.core.util.StrUtil;
  5. import cn.hutool.extra.spring.SpringUtil;
  6. import com.gz.common.config.RedshiftProperties;
  7. import lombok.Builder;
  8. import lombok.Data;
  9. import lombok.extern.slf4j.Slf4j;
  10. import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
  11. import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
  12. import software.amazon.awssdk.regions.Region;
  13. import software.amazon.awssdk.services.redshiftdata.RedshiftDataClient;
  14. import software.amazon.awssdk.services.redshiftdata.model.*;
  15. import java.util.*;
  16. import java.util.function.BiFunction;
  17. import java.util.function.Supplier;
  18. @Slf4j
  19. @Data
  20. public class RedshiftUtils {
  21. public interface Command {
  22. String getCommand();
  23. }
  24. public static class CopyCommand implements Command {
  25. public static final String COPY_COMMAND = "copy {table} from '{s3Url}' delimiter '{delimiter}' iam_role '{iamRole}' {options};";
  26. private String table;
  27. private String s3Url;
  28. private String delimiter = ",";
  29. private String iamRole;
  30. private Map<String, String> options = new HashMap<>();
  31. @Builder
  32. public CopyCommand(String table, String s3Url, String delimiter, String iamRole) {
  33. this.table = table;
  34. this.s3Url = s3Url;
  35. this.delimiter = delimiter;
  36. this.iamRole = iamRole;
  37. }
  38. public Map<String, String> ignoreheaderOpt(Integer num) {
  39. options.put("ignoreheader", num.toString());
  40. return options;
  41. }
  42. public Map<String, String> addOptions(Map<String, String> options) {
  43. this.options.putAll(options);
  44. return options;
  45. }
  46. public String getCommand() {
  47. String command = COPY_COMMAND;
  48. HashMap<String, String> map = new HashMap<>();
  49. StringBuilder optionBuilder = new StringBuilder();
  50. if (CollectionUtil.isNotEmpty(options)) {
  51. options.forEach((k, v) -> {
  52. optionBuilder.append(k).append(" ").append(v).append(" ");
  53. });
  54. map.put("options", optionBuilder.toString());
  55. } else {
  56. command = command.replace("{options}", "");
  57. }
  58. map.put("table", this.table);
  59. map.put("s3Url", this.s3Url);
  60. map.put("delimiter", this.delimiter);
  61. map.put("iamRole", this.iamRole);
  62. return StrUtil.format(command, map);
  63. }
  64. }
  65. public static class UnLoadCommand implements Command {
  66. public static final String UNLOAD_COMMAND = "unload ('{sql}') to '{s3Url}' iam_role '{iamRole}' {options};";
  67. private String sql;
  68. private String s3Url;
  69. private String iamRole;
  70. private Map<String, String> options = new HashMap<>();
  71. @Builder
  72. public UnLoadCommand(String sql, String s3Url, String iamRole) {
  73. this.sql = sql;
  74. this.s3Url = s3Url;
  75. this.iamRole = iamRole;
  76. }
  77. public Map<String, String> addOptions(Map<String, String> options) {
  78. this.options.putAll(options);
  79. return options;
  80. }
  81. public String getCommand() {
  82. String command = UNLOAD_COMMAND;
  83. HashMap<String, String> map = new HashMap<>();
  84. StringBuilder optionBuilder = new StringBuilder();
  85. if (CollectionUtil.isNotEmpty(options)) {
  86. options.forEach((k, v) -> {
  87. optionBuilder.append(k).append(" ").append(v).append(" ");
  88. });
  89. map.put("options", optionBuilder.toString());
  90. } else {
  91. command = command.replace("{options}", "");
  92. }
  93. map.put("sql", this.sql);
  94. map.put("s3Url", this.s3Url);
  95. map.put("iamRole", this.iamRole);
  96. return StrUtil.format(command, map);
  97. }
  98. }
  99. private static String ACCESS_KEY;
  100. private static String SECRE_KEY;
  101. private static String DATABASE;
  102. private static String SECRE_ARN;
  103. private static String GROUP_NAME;
  104. private static Region REGION; // 设置您的区域;
  105. static {
  106. RedshiftProperties redshiftProperties = SpringUtil.getBean(RedshiftProperties.class);
  107. ACCESS_KEY = redshiftProperties.getAccessKey();
  108. SECRE_KEY = redshiftProperties.getSecretkey();
  109. DATABASE = redshiftProperties.getDatabase();
  110. SECRE_ARN = redshiftProperties.getArn();
  111. GROUP_NAME = redshiftProperties.getGroupName();
  112. REGION = Region.regions().stream().filter(e -> e.id().equals(redshiftProperties.getRegion())).findAny().orElseThrow();
  113. }
  114. public static RedshiftDataClient getReshiftDataClient() {
  115. return RedshiftDataClient
  116. .builder()
  117. .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(ACCESS_KEY, SECRE_KEY)))
  118. .region(REGION).build();
  119. }
  120. public static RedshiftDataClient getReshiftDataClient(Region region) {
  121. return RedshiftDataClient
  122. .builder()
  123. .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(ACCESS_KEY, SECRE_KEY)))
  124. .region(region).build();
  125. }
  126. /**
  127. * 执行sql语句
  128. *
  129. * @param sql
  130. * @return null or list
  131. */
  132. public static <T extends RedshiftDataResponse> T executeStatement(String sql, Class<T> retType) {
  133. return execQueryPipeline(Collections.singletonList(sql),
  134. RedshiftUtils::getReshiftDataClient,
  135. RedshiftUtils::executeQueryStatement,
  136. RedshiftUtils::executeDescribeStatement,
  137. RedshiftUtils::executeGetResultStatement,
  138. retType
  139. );
  140. }
  141. /**
  142. * 获取查询结果
  143. *
  144. * @param reshiftClient
  145. * @param excuteResponse
  146. * @return
  147. */
  148. public static GetStatementResultResponse executeGetResultStatement(RedshiftDataClient reshiftClient, ExecuteStatementResponse excuteResponse) {
  149. GetStatementResultRequest getStatementRequest = GetStatementResultRequest.builder()
  150. .id(excuteResponse.id())
  151. .build();
  152. return reshiftClient.getStatementResult(getStatementRequest);
  153. }
  154. /**
  155. * 获取sql语句执行状态和结果
  156. *
  157. * @param reshiftClient
  158. * @param excuteResponse
  159. * @return
  160. */
  161. public static DescribeStatementResponse executeDescribeStatement(RedshiftDataClient reshiftClient, ExecuteStatementResponse excuteResponse) {
  162. DescribeStatementRequest describeStatementRequest = DescribeStatementRequest.builder()
  163. .id(excuteResponse.id())
  164. .build();
  165. return reshiftClient.describeStatement(describeStatementRequest);
  166. }
  167. /**
  168. * 获取sql查询请求的响应
  169. *
  170. * @param reshiftClient
  171. * @param sql
  172. * @return {@link ExecuteStatementResponse}
  173. */
  174. public static ExecuteStatementResponse executeQueryStatement(RedshiftDataClient reshiftClient, List<String> sql) {
  175. ExecuteStatementRequest statementRequest = ExecuteStatementRequest.builder().secretArn(SECRE_ARN).workgroupName(GROUP_NAME).database(DATABASE).sql(sql.get(0)).build();
  176. return reshiftClient.executeStatement(statementRequest);
  177. }
  178. /**
  179. * 查询执行管道
  180. *
  181. * @param sql
  182. * @param clientSupply
  183. * @param execueQueryFuntion
  184. * @param executeDescribeFuntion
  185. * @param executeGetResultFuntion
  186. * @param expectRetType 可根据expectRetType进行流程执行控制
  187. * @param <C>
  188. * @param <T>
  189. * @param <G>
  190. * @param <D>
  191. * @param <R>
  192. * @return expectRetType
  193. */
  194. public static <C extends RedshiftDataClient,
  195. T extends RedshiftDataResponse,
  196. G extends GetStatementResultResponse,
  197. D extends DescribeStatementResponse,
  198. R extends RedshiftDataResponse> R execQueryPipeline(List<String> sql,
  199. Supplier<C> clientSupply,
  200. BiFunction<C, List<String>, T> execueQueryFuntion,
  201. BiFunction<C, T, D> executeDescribeFuntion,
  202. BiFunction<C, T, G> executeGetResultFuntion,
  203. Class<R> expectRetType
  204. ) {
  205. R retTypr = null;
  206. C client = clientSupply.get();
  207. try (client) {
  208. T queryResponse = execueQueryFuntion.apply(client, sql);
  209. DescribeStatementResponse describeResponse = executeDescribeFuntion.apply(client, queryResponse);
  210. Assert.isNull(describeResponse.error(), () -> {
  211. log.error("error detail:{}", describeResponse);
  212. return RedshiftDataException.builder().message(describeResponse.error()).build();
  213. });
  214. if (Objects.nonNull(expectRetType) && expectRetType.isAssignableFrom(DescribeStatementResponse.class)) {
  215. retTypr = Convert.convert(expectRetType, describeResponse);
  216. } else if (describeResponse.hasResultSet()) {
  217. GetStatementResultResponse getResultResponse = executeGetResultFuntion.apply(client, queryResponse);
  218. if (Objects.nonNull(expectRetType) && expectRetType.isAssignableFrom(GetStatementResultResponse.class)) {
  219. retTypr = Convert.convert(expectRetType, getResultResponse);
  220. }
  221. }
  222. }
  223. return retTypr;
  224. }
  225. /**
  226. * 执行unload到s3
  227. *
  228. * @return
  229. */
  230. public static DescribeStatementResponse executeUnload(UnLoadCommand unLoadCommand) {
  231. return execQueryPipeline(Collections.singletonList(unLoadCommand.getCommand()),
  232. RedshiftUtils::getReshiftDataClient,
  233. RedshiftUtils::executeQueryStatement,
  234. RedshiftUtils::executeDescribeStatement,
  235. RedshiftUtils::executeGetResultStatement,
  236. DescribeStatementResponse.class
  237. );
  238. }
  239. /**
  240. * 执行copy
  241. *
  242. * @return
  243. */
  244. public static DescribeStatementResponse executeCopy(CopyCommand command) {
  245. return execQueryPipeline(Collections.singletonList(command.getCommand()),
  246. RedshiftUtils::getReshiftDataClient,
  247. RedshiftUtils::executeQueryStatement,
  248. RedshiftUtils::executeDescribeStatement,
  249. RedshiftUtils::executeGetResultStatement,
  250. DescribeStatementResponse.class
  251. );
  252. }
  253. /**
  254. * 批处理
  255. *
  256. * @param sqls
  257. * @return
  258. */
  259. public static <T extends RedshiftDataResponse> T batchExecuteStatement(List<String> sqls, Class<T> retType) {
  260. return execQueryPipeline(
  261. sqls,
  262. RedshiftUtils::getReshiftDataClient,
  263. (cl, s) -> {
  264. BatchExecuteStatementRequest request = BatchExecuteStatementRequest.builder().secretArn(SECRE_ARN).workgroupName(GROUP_NAME).database(DATABASE).sqls(sqls).build();
  265. return cl.batchExecuteStatement(request);
  266. },
  267. (cl, re) -> {
  268. DescribeStatementRequest describeStatementRequest = DescribeStatementRequest.builder()
  269. .id(re.id())
  270. .build();
  271. return cl.describeStatement(describeStatementRequest);
  272. },
  273. (cl, re) -> {
  274. GetStatementResultRequest getStatementRequest = GetStatementResultRequest.builder()
  275. .id(re.id())
  276. .build();
  277. return cl.getStatementResult(getStatementRequest);
  278. }, retType
  279. );
  280. }
  281. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/888754
推荐阅读
相关标签
  

闽ICP备14008679号