赞
踩
前言:
在使用fastDFS文件存储的时候,使用过程中会时不时出现 connect timed out,一部分原因是因为配置。
因此为应对此场景,创建连接池进行心跳监控保证长链接。
1.创建fastdfs连接池:
- /**
- * @Description: fastdfs连接池
- * @version 1.0
- * @Description
- */
- public class ConnectionPool {
- private static final Logger LOGGER = LoggerFactory
- .getLogger(ConnectionPool.class);
- /**
- * 空闲的连接池
- */
- private LinkedBlockingQueue<TrackerServer> idleConnectionPool = null;
- /**
- * 连接池默认最小连接数
- */
- private long minPoolSize = 10;
- /**
- * 连接池默认最大连接数
- */
- private long maxPoolSize = 30;
- /**
- * 当前创建的连接数
- */
- private volatile long nowPoolSize = 0;
- /**
- * 默认等待时间(单位:秒)
- */
- private long waitTimes = 1200;
- /**
- * fastdfs客户端创建连接默认1次
- */
- private static final int COUNT = 2;
-
- /**
- * 默认构造方法
- */
- public ConnectionPool(long minPoolSize, long maxPoolSize, long waitTimes) {
- String logId = UUID.randomUUID().toString();
- LOGGER.info("[线程池构造方法(ConnectionPool)][" + logId
- + "][默认参数:minPoolSize=" + minPoolSize + ",maxPoolSize="
- + maxPoolSize + ",waitTimes=" + waitTimes + "]");
- this.minPoolSize = minPoolSize;
- this.maxPoolSize = maxPoolSize;
- this.waitTimes = waitTimes;
- /** 初始化连接池 */
- poolInit(logId);
- /** 注册心跳 */
- HeartBeat beat = new HeartBeat(this);
- beat.beat();
- }
-
- /**
- * @Description: 连接池初始化 (在加载当前ConnectionPool时执行) 1).加载配置文件 2).空闲连接池初始化;
- * 3).创建最小连接数的连接,并放入到空闲连接池;
- */
- private void poolInit(String logId) {
- try {
- /** 加载配置文件 */
- initClientGlobal();
- /** 初始化空闲连接池 */
- idleConnectionPool = new LinkedBlockingQueue<TrackerServer>();
- /** 往线程池中添加默认大小的线程 */
- for (int i = 0; i < minPoolSize; i++) {
- createTrackerServer(logId, COUNT);
- }
- } catch (Exception e) {
- LOGGER.error("[FASTDFS初始化(init)--异常][" + logId + "][异常:{}]", e);
- }
- }
-
- /**
- * @Description: 创建TrackerServer, 并放入空闲连接池
- */
- public void createTrackerServer(String logId, int flag) {
-
- // LOGGER.info("[创建TrackerServer(createTrackerServer)][" + logId + "]");
- TrackerServer trackerServer = null;
-
- try {
-
- TrackerClient trackerClient = new TrackerClient();
- trackerServer = trackerClient.getConnection();
- while (trackerServer == null && flag < 5) {
- LOGGER.info("[创建TrackerServer(createTrackerServer)][" + logId
- + "][第" + flag + "次重建]");
- flag++;
- initClientGlobal();
- trackerServer = trackerClient.getConnection();
- }
- org.csource.fastdfs.ProtoCommon.activeTest(trackerServer
- .getSocket());
- idleConnectionPool.add(trackerServer);
- /** 同一时间只允许一个线程对nowPoolSize操作 **/
- synchronized (this) {
- nowPoolSize++;
- }
-
- } catch (Exception e) {
-
- LOGGER.error("[创建TrackerServer(createTrackerServer)][" + logId
- + "][异常:{}]", e);
-
- } finally {
-
- if (trackerServer != null) {
- try {
- trackerServer.close();
- } catch (Exception e) {
- LOGGER.error("[创建TrackerServer(createTrackerServer)--关闭trackerServer异常]["
- + logId + "][异常:{}]", e);
- }
- }
-
- }
- }
-
- /**
- * @throws AppException
- * @Description: 获取空闲连接 1).在空闲池(idleConnectionPool)中弹出一个连接;
- * 2).把该连接放入忙碌池(busyConnectionPool)中; 3).返回 connection
- * 4).如果没有idle connection, 等待 wait_time秒, and check again
- */
- public TrackerServer checkout(String logId) throws AppException {
-
- LOGGER.info("[获取空闲连接(checkout)][" + logId + "]");
- TrackerServer trackerServer = idleConnectionPool.poll();
-
- if (trackerServer == null) {
-
- if (nowPoolSize < maxPoolSize) {
- createTrackerServer(logId, COUNT);
- try {
- trackerServer = idleConnectionPool.poll(waitTimes,
- TimeUnit.SECONDS);
- } catch (Exception e) {
- LOGGER.error("[获取空闲连接(checkout)-error][" + logId
- + "][error:获取连接超时:{}]", e);
- throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
- }
- }
- if (trackerServer == null) {
- LOGGER.error("[获取空闲连接(checkout)-error][" + logId
- + "][error:获取连接超时(" + waitTimes + "s)]");
- throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
- }
-
- }
- LOGGER.info("[获取空闲连接(checkout)][" + logId + "][获取空闲连接成功]");
- return trackerServer;
-
- }
-
- /**
- * @param trackerServer 需释放的连接对象
- * @Description: 释放繁忙连接 1.如果空闲池的连接小于最小连接值,就把当前连接放入idleConnectionPool;
- * 2.如果空闲池的连接等于或大于最小连接值,就把当前释放连接丢弃;
- */
-
- public void checkin(TrackerServer trackerServer, String logId) {
-
- LOGGER.info("[释放当前连接(checkin)][" + logId + "][prams:" + trackerServer
- + "] ");
- if (trackerServer != null) {
- if (idleConnectionPool.size() < minPoolSize) {
- idleConnectionPool.add(trackerServer);
- } else {
- synchronized (this) {
- if (nowPoolSize != 0) {
- nowPoolSize--;
- }
- }
- }
- }
-
- }
-
- /**
- * @param trackerServer
- * @Description: 删除不可用的连接,并把当前连接数减一(调用过程中trackerServer报异常,调用一般在finally中)
- */
- public void drop(TrackerServer trackerServer, String logId) {
- LOGGER.info("[删除不可用连接方法(drop)][" + logId + "][parms:" + trackerServer
- + "] ");
- if (trackerServer != null) {
- try {
- synchronized (this) {
- if (nowPoolSize != 0) {
- nowPoolSize--;
- }
- }
- trackerServer.close();
- } catch (IOException e) {
- LOGGER.info("[删除不可用连接方法(drop)--关闭trackerServer异常][" + logId
- + "][异常:{}]", e);
- }
- }
- }
-
- private void initClientGlobal() throws Exception {
- ClientGlobal.init(this.getClass().getResource("/").getPath()+"deploy/fastdfs_client.conf");
- }
-
- public LinkedBlockingQueue<TrackerServer> getIdleConnectionPool() {
- return idleConnectionPool;
- }
-
- public long getMinPoolSize() {
- return minPoolSize;
- }
-
- public void setMinPoolSize(long minPoolSize) {
- if (minPoolSize != 0) {
- this.minPoolSize = minPoolSize;
- }
- }
-
- public long getMaxPoolSize() {
- return maxPoolSize;
- }
-
- public void setMaxPoolSize(long maxPoolSize) {
- if (maxPoolSize != 0) {
- this.maxPoolSize = maxPoolSize;
- }
- }
-
- public long getWaitTimes() {
- return waitTimes;
- }
-
- public void setWaitTimes(int waitTimes) {
- if (waitTimes != 0) {
- this.waitTimes = waitTimes;
- }
- }
- }

2.连接池定时器设置:
- /**
- * 连接池定时器设置
- * @version 1.0
- * @Description
- */
- public class HeartBeat {
- private static final Logger LOGGER = LoggerFactory
- .getLogger(HeartBeat.class);
- /**
- * fastdfs连接池
- */
- private ConnectionPool pool = null;
- /**
- * 小时毫秒数
- */
- public static int ahour = 1000 * 60 * 60 * 1;
- /**
- * 等待时间
- */
- public static int waitTimes = 1200;
-
- public HeartBeat(ConnectionPool pool) {
- this.pool = pool;
- }
-
- /**
- * @Description: 定时执行任务,检测当前的空闲连接是否可用,如果不可用将从连接池中移除
- */
- public void beat() {
- LOGGER.info("[心跳任务方法(beat)]");
- TimerTask task = new TimerTask() {
- @Override
- public void run() {
- String logId = UUID.randomUUID().toString();
- LOGGER.info("[心跳任务方法(beat)]["
- + logId
- + "][Description:对idleConnectionPool中的TrackerServer进行监测]");
- LinkedBlockingQueue<TrackerServer> idleConnectionPool = pool
- .getIdleConnectionPool();
- TrackerServer ts = null;
- for (int i = 0; i < idleConnectionPool.size(); i++) {
- try {
- ts = idleConnectionPool.poll(waitTimes,
- TimeUnit.SECONDS);
- if (ts != null) {
- org.csource.fastdfs.ProtoCommon.activeTest(ts
- .getSocket());
- idleConnectionPool.add(ts);
- } else {
- /** 代表已经没有空闲长连接 */
- break;
- }
- } catch (Exception e) {
- /** 发生异常,要删除,进行重建 */
- LOGGER.error("[心跳任务方法(beat)][" + logId
- + "][异常:当前连接已不可用将进行重新获取连接]");
- pool.drop(ts, logId);
- }
- }
- }
- };
- Timer timer = new Timer();
- timer.schedule(task, ahour, ahour);
- }
- }

3.工具类实现文件的上传下载:
- /**
- * fastdfs文件操作工具类 1).初始化连接池; 2).实现文件的上传与下载;
- * @version 1.0
- * @Description
- */
- public class FastDfsUtil {
- private static final Logger LOGGER = LoggerFactory
- .getLogger(FastDfsUtil.class);
- /**
- * 连接池
- */
- private static ConnectionPool connectionPool = null;
- /**
- * 连接池默认最小连接数
- */
- private long minPoolSize = 10;
- /**
- * 连接池默认最大连接数
- */
- private long maxPoolSize = 30;
- /**
- * 当前创建的连接数
- */
- private volatile long nowPoolSize = 0;
- /**
- * 默认等待时间(单位:秒)
- */
- private long waitTimes = 1200;
-
- /**
- * 初始化线程池
- *
- * @Description:
- */
- public void init() {
- String logId = UUID.randomUUID().toString();
- LOGGER.info("[初始化线程池(Init)][" + logId + "][默认参数:minPoolSize="
- + minPoolSize + ",maxPoolSize=" + maxPoolSize + ",waitTimes="
- + waitTimes + "]");
- connectionPool = new ConnectionPool(minPoolSize, maxPoolSize, waitTimes);
- }
-
- /**
- * @param groupName 组名如group0
- * @param fileBytes 文件字节数组
- * @param extName 文件扩展名:如png
- * @param linkUrl 访问地址:http://image.xxx.com
- * @return 图片上传成功后地址
- * @throws AppException
- * @Description: TODO(这里用一句话描述这个方法的作用)
- */
- public String upload(String groupName, byte[] fileBytes, String extName,
- String linkUrl) throws AppException {
- String logId = UUID.randomUUID().toString();
- /** 封装文件信息参数 */
- NameValuePair[] metaList = new NameValuePair[]{new NameValuePair(
- "fileName", "")};
- TrackerServer trackerServer = null;
- try {
-
- /** 获取fastdfs服务器连接 */
- trackerServer = connectionPool.checkout(logId);
- StorageServer storageServer = null;
- StorageClient1 client1 = new StorageClient1(trackerServer,
- storageServer);
-
- /** 以文件字节的方式上传 */
- String[] results = client1.upload_file(groupName, fileBytes, extName, metaList);
-
- /** 上传完毕及时释放连接 */
- connectionPool.checkin(trackerServer, logId);
-
- /** results[0]:组名,results[1]:远程文件名 */
- if (results != null && results.length == 2) {
- String result = results[0] + "/" + results[1];
-
- LOGGER.info("[上传文件(upload)-fastdfs服务器相应结果][" + logId
- + "][result:" + result + "]");
-
- if (!StringUtils.isEmpty(linkUrl))
- return linkUrl + "/" + result;
- else {
- return result;
- }
- } else {
- /** 文件系统上传返回结果错误 */
- throw ERRORS.UPLOAD_RESULT_ERROR.ERROR();
- }
- } catch (AppException e) {
-
- LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
- throw e;
-
- } catch (SocketTimeoutException e) {
- LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
- throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
- } catch (Exception e) {
-
- LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
- connectionPool.drop(trackerServer, logId);
- throw ERRORS.SYS_ERROR.ERROR();
-
- }
-
- }
-
- /**
- * @param groupName 组名如group0
- * @param fileBytes 文件字节数组
- * @param extName 文件扩展名:如png
- * @param linkUrl 访问地址:http://image.xxx.com
- * @return 图片上传成功后地址
- * @throws AppException
- * @Description: TODO(这里用一句话描述这个方法的作用)
- */
- public static String upload(byte[] fileBytes, String extName,
- String linkUrl) throws AppException {
- String logId = UUID.randomUUID().toString();
- /** 封装文件信息参数 */
- NameValuePair[] metaList = new NameValuePair[]{new NameValuePair(
- "fileName", "")};
- TrackerServer trackerServer = null;
- try {
-
- /** 获取fastdfs服务器连接 */
- trackerServer = connectionPool.checkout(logId);
- StorageServer storageServer = null;
- StorageClient1 client1 = new StorageClient1(trackerServer,
- storageServer);
-
- /** 以文件字节的方式上传 */
- String[] results = client1.upload_file(fileBytes,extName, metaList);
-
- /** 上传完毕及时释放连接 */
- connectionPool.checkin(trackerServer, logId);
-
- /** results[0]:组名,results[1]:远程文件名 */
- if (results != null && results.length == 2) {
- String result= results[0] + "/" + results[1];
-
- LOGGER.info("[上传文件(upload)-fastdfs服务器相应结果][" + logId
- + "][result:" + result + "]");
-
- if (!StringUtils.isEmpty(linkUrl))
- return linkUrl + "/" + result;
- else {
- return result;
- }
- } else {
- /** 文件系统上传返回结果错误 */
- throw ERRORS.UPLOAD_RESULT_ERROR.ERROR();
- }
- } catch (AppException e) {
-
- LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
- throw e;
-
- } catch (SocketTimeoutException e) {
- LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
- throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
- } catch (Exception e) {
-
- LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
- connectionPool.drop(trackerServer, logId);
- throw ERRORS.SYS_ERROR.ERROR();
-
- }
-
- }
-
- /**
- * @param group_name 组名
- * @param remote_filename 远程文件名称
- * @throws AppException
- * @Description: 删除fastdfs服务器中文件
- */
- public void deleteFile(String group_name, String remote_filename)
- throws AppException {
-
- String logId = UUID.randomUUID().toString();
- LOGGER.info("[ 删除文件(deleteFile)][" + logId + "][parms:group_name="
- + group_name + ",remote_filename=" + remote_filename + "]");
- TrackerServer trackerServer = null;
-
- try {
- /** 获取可用的tracker,并创建存储server */
- trackerServer = connectionPool.checkout(logId);
- StorageServer storageServer = null;
- StorageClient1 client1 = new StorageClient1(trackerServer,
- storageServer);
- /** 删除文件,并释放 trackerServer */
- //int result = client1.delete_file(group_name, remote_filename);
- int result = client1.delete_file1(remote_filename);
-
- /** 上传完毕及时释放连接 */
- connectionPool.checkin(trackerServer, logId);
-
- LOGGER.info("[ 删除文件(deleteFile)--调用fastdfs客户端返回结果][" + logId
- + "][results:result=" + result + "]");
-
- //** 0:文件删除成功,2:文件不存在 ,其它:文件删除出错 *//*
- if (result == 2) {
-
- throw ERRORS.NOT_EXIST_FILE.ERROR();
-
- } else if (result != 0) {
-
- throw ERRORS.DELETE_RESULT_ERROR.ERROR();
-
- }
-
- } catch (AppException e) {
-
- LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
- throw e;
-
- } catch (SocketTimeoutException e) {
- LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
- throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
- } catch (Exception e) {
-
- LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
- connectionPool.drop(trackerServer, logId);
- throw ERRORS.SYS_ERROR.ERROR();
-
- }
- }
-
- /**
- * @param fileld 远程文件名称
- * @throws AppException
- * @Description: 删除fastdfs服务器中文件
- */
- public boolean delFile(String fileld)
- throws AppException {
- int count = 0;
- String logId = UUID.randomUUID().toString();
- LOGGER.info("[ 删除文件(deleteFile)][" + logId + "],remote_filename=" + fileld + "]");
- TrackerServer trackerServer = null;
-
- try {
- /** 获取可用的tracker,并创建存储server */
- trackerServer = connectionPool.checkout(logId);
- StorageServer storageServer = null;
- StorageClient1 client1 = new StorageClient1(trackerServer,
- storageServer);
- /** 删除文件,并释放 trackerServer */
- count = client1.delete_file1(fileld);
-
- /** 上传完毕及时释放连接 */
- connectionPool.checkin(trackerServer, logId);
-
- LOGGER.info("[ 删除文件(deleteFile)--调用fastdfs客户端返回结果][" + logId
- + "][results:result=" + count + "]");
-
- } catch (AppException e) {
-
- LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
- throw e;
-
- } catch (SocketTimeoutException e) {
- LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
- throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
- } catch (Exception e) {
-
- LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
- connectionPool.drop(trackerServer, logId);
- throw ERRORS.SYS_ERROR.ERROR();
-
- }
- return count == 0 ? true : false;
- }
-
- public ConnectionPool getConnectionPool() {
- return connectionPool;
- }
-
- public void setConnectionPool(ConnectionPool connectionPool) {
- this.connectionPool = connectionPool;
- }
-
- public long getMinPoolSize() {
- return minPoolSize;
- }
-
- public void setMinPoolSize(long minPoolSize) {
- this.minPoolSize = minPoolSize;
- }
-
- public long getMaxPoolSize() {
- return maxPoolSize;
- }
-
- public void setMaxPoolSize(long maxPoolSize) {
- this.maxPoolSize = maxPoolSize;
- }
-
- public long getNowPoolSize() {
- return nowPoolSize;
- }
-
- public void setNowPoolSize(long nowPoolSize) {
- this.nowPoolSize = nowPoolSize;
- }
-
- public long getWaitTimes() {
- return waitTimes;
- }
-
- public void setWaitTimes(long waitTimes) {
- this.waitTimes = waitTimes;
- }
- }

以上就是fastdfs连接池配置。查看日志:
对于fastDFS安装请参考:https://blog.csdn.net/u011663149/article/details/85321181
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。