赞
踩
代码结构:
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; public class HttpClientUtil { public static synchronized String doGet(String httpurl) { HttpURLConnection connection = null; InputStream is = null; BufferedReader br = null; String result = null;// 返回结果字符串 try { // 创建远程url连接对象 URL url = new URL(httpurl); // 通过远程url连接对象打开一个连接,强转成httpURLConnection类 connection = (HttpURLConnection) url.openConnection(); // 设置连接方式:get connection.setRequestMethod("GET"); // 设置连接主机服务器的超时时间:15000毫秒 connection.setConnectTimeout(15000); // 设置读取远程返回的数据时间:60000毫秒 connection.setReadTimeout(60000); // 发送请求 connection.connect(); // 通过connection连接,获取输入流 if (connection.getResponseCode() == 200) { is = connection.getInputStream(); // 封装输入流is,并指定字符集 br = new BufferedReader(new InputStreamReader(is, "UTF-8")); // 存放数据 StringBuffer sbf = new StringBuffer(); String temp = null; while ((temp = br.readLine()) != null) { sbf.append(temp); sbf.append("\r\n"); } result = sbf.toString(); } } catch (MalformedURLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { // 关闭资源 if (null != br) { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } if (null != is) { try { is.close(); } catch (IOException e) { e.printStackTrace(); } } connection.disconnect();// 关闭远程连接 } return result; } public static String doPost(String httpUrl, String param) { HttpURLConnection connection = null; InputStream is = null; OutputStream os = null; BufferedReader br = null; String result = null; try { URL url = new URL(httpUrl); // 通过远程url连接对象打开连接 connection = (HttpURLConnection) url.openConnection(); // 设置连接请求方式 connection.setRequestMethod("POST"); // 设置连接主机服务器超时时间:15000毫秒 connection.setConnectTimeout(15000); // 设置读取主机服务器返回数据超时时间:60000毫秒 connection.setReadTimeout(60000); // 默认值为:false,当向远程服务器传送数据/写数据时,需要设置为true connection.setDoOutput(true); // 默认值为:true,当前向远程服务读取数据时,设置为true,该参数可有可无 connection.setDoInput(true); // 设置传入参数的格式:请求参数应该是 name1=value1&name2=value2 的形式。 connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); // 设置鉴权信息:Authorization: Bearer da3efcbf-0845-4fe3-8aba-ee040be542c0 connection.setRequestProperty("Authorization", "Bearer da3efcbf-0845-4fe3-8aba-ee040be542c0"); // 通过连接对象获取一个输出流 os = connection.getOutputStream(); // 通过输出流对象将参数写出去/传输出去,它是通过字节数组写出的 os.write(param.getBytes()); // 通过连接对象获取一个输入流,向远程读取 if (connection.getResponseCode() == 200) { is = connection.getInputStream(); // 对输入流对象进行包装:charset根据工作项目组的要求来设置 br = new BufferedReader(new InputStreamReader(is, "UTF-8")); StringBuffer sbf = new StringBuffer(); String temp = null; // 循环遍历一行一行读取数据 while ((temp = br.readLine()) != null) { sbf.append(temp); sbf.append("\r\n"); } result = sbf.toString(); } } catch (MalformedURLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { // 关闭资源 if (null != br) { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } if (null != os) { try { os.close(); } catch (IOException e) { e.printStackTrace(); } } if (null != is) { try { is.close(); } catch (IOException e) { e.printStackTrace(); } } // 断开与远程地址url的连接 connection.disconnect(); } return result; } }
import com.example.demo.module.utils.ThreadPoolUtils; import org.springframework.util.StopWatch; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; public class LatchTest { public static void main(String[] args) throws InterruptedException { // size-请求次数;ThreadPoolUtils.POOL_SIZE - 并发量(操作系统最大线程数+2) int size = 1_000; StopWatch sw = new StopWatch(); sw.start(); ExecutorService pool = ThreadPoolUtils.initPool(); AtomicInteger success = new AtomicInteger(); // 统计成功、失败次数 AtomicInteger fail = new AtomicInteger(); AtomicInteger index = new AtomicInteger(); while (index.get() < size) { int poolSize = size - index.get() > ThreadPoolUtils.POOL_SIZE ? ThreadPoolUtils.POOL_SIZE : size - index.get(); CountDownLatch countDownLatch = new CountDownLatch(poolSize); for (int i = 0; i < poolSize; i++) { int iCounter = index.get(); pool.execute(new Thread(() -> { try { // TODO 处理业务 String result = HttpClientUtil.doGet("http://localhost:8081/login"); System.out.println(String.format(">>>>>>>>>> %s. result: %s", iCounter, result)); success.incrementAndGet(); } catch (Exception e) { e.printStackTrace(); fail.incrementAndGet(); } finally { countDownLatch.countDown(); } })); index.incrementAndGet(); } countDownLatch.await(); } ThreadPoolUtils.shutdownPool(pool); sw.stop(); String minutes = new BigDecimal(String.valueOf(sw.getTotalTimeSeconds()/60)).setScale(0, RoundingMode.DOWN).toString(); String seconds = new BigDecimal(String.valueOf(sw.getTotalTimeSeconds()%60)).setScale(0, RoundingMode.HALF_UP).toString(); System.out.println(String.format("成功次数:%s;失败次数:%s", success, fail)); System.out.println(String.format("耗时: %sm%ss", minutes, seconds)); } }
import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * <p> @Title ThreadPoolUtils * <p> @Description 线程池工具类 * * @author ACGkaka * @date 2021/1/19 11:22 */ @Slf4j public class ThreadPoolUtils { /** 最佳线程数: 操作系统最大线程数+2 */ public static final int POOL_SIZE = Runtime.getRuntime().availableProcessors() + 2; /** 线程池关闭前等待时长 */ private static final int AWAIT_TIME = 5_000; /** * 初始化线程池 * * @return 线程池 */ public static ExecutorService initPool() { return initPool("pool-%d"); } /** * 初始化线程池 - 重命名线程 * * @param nameFormat 线程命名格式,例如:CustHis96Lc-pool-%d * @return 线程池 */ public static ExecutorService initPool(String nameFormat) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(nameFormat).build(); return new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); } /** * 关闭线程池 * shutdown() 只起到通知的作用,需要进一步保证线程池关闭 * * @param pool 线程池 */ public static void shutdownPool(ExecutorService pool) { try { pool.shutdown(); // 所有的任务都结束的时候,返回true if (pool.awaitTermination(AWAIT_TIME, TimeUnit.MILLISECONDS)) { pool.shutdownNow(); } } catch (InterruptedException e) { // awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。 log.info("awaitTermination: " + e.getMessage()); pool.shutdownNow(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。