当前位置:   article > 正文

FastDFS连接池实现文件上传并监控TrackerServer心跳_fastdfs 监控

fastdfs 监控

前言:

        在使用fastDFS文件存储的时候,使用过程中会时不时出现 connect timed out,一部分原因是因为配置。

        因此为应对此场景,创建连接池进行心跳监控保证长链接。

1.创建fastdfs连接池:

  1. /**
  2. * @Description: fastdfs连接池
  3. * @version 1.0
  4. * @Description
  5. */
  6. public class ConnectionPool {
  7. private static final Logger LOGGER = LoggerFactory
  8. .getLogger(ConnectionPool.class);
  9. /**
  10. * 空闲的连接池
  11. */
  12. private LinkedBlockingQueue<TrackerServer> idleConnectionPool = null;
  13. /**
  14. * 连接池默认最小连接数
  15. */
  16. private long minPoolSize = 10;
  17. /**
  18. * 连接池默认最大连接数
  19. */
  20. private long maxPoolSize = 30;
  21. /**
  22. * 当前创建的连接数
  23. */
  24. private volatile long nowPoolSize = 0;
  25. /**
  26. * 默认等待时间(单位:秒)
  27. */
  28. private long waitTimes = 1200;
  29. /**
  30. * fastdfs客户端创建连接默认1次
  31. */
  32. private static final int COUNT = 2;
  33. /**
  34. * 默认构造方法
  35. */
  36. public ConnectionPool(long minPoolSize, long maxPoolSize, long waitTimes) {
  37. String logId = UUID.randomUUID().toString();
  38. LOGGER.info("[线程池构造方法(ConnectionPool)][" + logId
  39. + "][默认参数:minPoolSize=" + minPoolSize + ",maxPoolSize="
  40. + maxPoolSize + ",waitTimes=" + waitTimes + "]");
  41. this.minPoolSize = minPoolSize;
  42. this.maxPoolSize = maxPoolSize;
  43. this.waitTimes = waitTimes;
  44. /** 初始化连接池 */
  45. poolInit(logId);
  46. /** 注册心跳 */
  47. HeartBeat beat = new HeartBeat(this);
  48. beat.beat();
  49. }
  50. /**
  51. * @Description: 连接池初始化 (在加载当前ConnectionPool时执行) 1).加载配置文件 2).空闲连接池初始化;
  52. * 3).创建最小连接数的连接,并放入到空闲连接池;
  53. */
  54. private void poolInit(String logId) {
  55. try {
  56. /** 加载配置文件 */
  57. initClientGlobal();
  58. /** 初始化空闲连接池 */
  59. idleConnectionPool = new LinkedBlockingQueue<TrackerServer>();
  60. /** 往线程池中添加默认大小的线程 */
  61. for (int i = 0; i < minPoolSize; i++) {
  62. createTrackerServer(logId, COUNT);
  63. }
  64. } catch (Exception e) {
  65. LOGGER.error("[FASTDFS初始化(init)--异常][" + logId + "][异常:{}]", e);
  66. }
  67. }
  68. /**
  69. * @Description: 创建TrackerServer, 并放入空闲连接池
  70. */
  71. public void createTrackerServer(String logId, int flag) {
  72. // LOGGER.info("[创建TrackerServer(createTrackerServer)][" + logId + "]");
  73. TrackerServer trackerServer = null;
  74. try {
  75. TrackerClient trackerClient = new TrackerClient();
  76. trackerServer = trackerClient.getConnection();
  77. while (trackerServer == null && flag < 5) {
  78. LOGGER.info("[创建TrackerServer(createTrackerServer)][" + logId
  79. + "][第" + flag + "次重建]");
  80. flag++;
  81. initClientGlobal();
  82. trackerServer = trackerClient.getConnection();
  83. }
  84. org.csource.fastdfs.ProtoCommon.activeTest(trackerServer
  85. .getSocket());
  86. idleConnectionPool.add(trackerServer);
  87. /** 同一时间只允许一个线程对nowPoolSize操作 **/
  88. synchronized (this) {
  89. nowPoolSize++;
  90. }
  91. } catch (Exception e) {
  92. LOGGER.error("[创建TrackerServer(createTrackerServer)][" + logId
  93. + "][异常:{}]", e);
  94. } finally {
  95. if (trackerServer != null) {
  96. try {
  97. trackerServer.close();
  98. } catch (Exception e) {
  99. LOGGER.error("[创建TrackerServer(createTrackerServer)--关闭trackerServer异常]["
  100. + logId + "][异常:{}]", e);
  101. }
  102. }
  103. }
  104. }
  105. /**
  106. * @throws AppException
  107. * @Description: 获取空闲连接 1).在空闲池(idleConnectionPool)中弹出一个连接;
  108. * 2).把该连接放入忙碌池(busyConnectionPool)中; 3).返回 connection
  109. * 4).如果没有idle connection, 等待 wait_time秒, and check again
  110. */
  111. public TrackerServer checkout(String logId) throws AppException {
  112. LOGGER.info("[获取空闲连接(checkout)][" + logId + "]");
  113. TrackerServer trackerServer = idleConnectionPool.poll();
  114. if (trackerServer == null) {
  115. if (nowPoolSize < maxPoolSize) {
  116. createTrackerServer(logId, COUNT);
  117. try {
  118. trackerServer = idleConnectionPool.poll(waitTimes,
  119. TimeUnit.SECONDS);
  120. } catch (Exception e) {
  121. LOGGER.error("[获取空闲连接(checkout)-error][" + logId
  122. + "][error:获取连接超时:{}]", e);
  123. throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
  124. }
  125. }
  126. if (trackerServer == null) {
  127. LOGGER.error("[获取空闲连接(checkout)-error][" + logId
  128. + "][error:获取连接超时(" + waitTimes + "s)]");
  129. throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
  130. }
  131. }
  132. LOGGER.info("[获取空闲连接(checkout)][" + logId + "][获取空闲连接成功]");
  133. return trackerServer;
  134. }
  135. /**
  136. * @param trackerServer 需释放的连接对象
  137. * @Description: 释放繁忙连接 1.如果空闲池的连接小于最小连接值,就把当前连接放入idleConnectionPool;
  138. * 2.如果空闲池的连接等于或大于最小连接值,就把当前释放连接丢弃;
  139. */
  140. public void checkin(TrackerServer trackerServer, String logId) {
  141. LOGGER.info("[释放当前连接(checkin)][" + logId + "][prams:" + trackerServer
  142. + "] ");
  143. if (trackerServer != null) {
  144. if (idleConnectionPool.size() < minPoolSize) {
  145. idleConnectionPool.add(trackerServer);
  146. } else {
  147. synchronized (this) {
  148. if (nowPoolSize != 0) {
  149. nowPoolSize--;
  150. }
  151. }
  152. }
  153. }
  154. }
  155. /**
  156. * @param trackerServer
  157. * @Description: 删除不可用的连接,并把当前连接数减一(调用过程中trackerServer报异常,调用一般在finally中)
  158. */
  159. public void drop(TrackerServer trackerServer, String logId) {
  160. LOGGER.info("[删除不可用连接方法(drop)][" + logId + "][parms:" + trackerServer
  161. + "] ");
  162. if (trackerServer != null) {
  163. try {
  164. synchronized (this) {
  165. if (nowPoolSize != 0) {
  166. nowPoolSize--;
  167. }
  168. }
  169. trackerServer.close();
  170. } catch (IOException e) {
  171. LOGGER.info("[删除不可用连接方法(drop)--关闭trackerServer异常][" + logId
  172. + "][异常:{}]", e);
  173. }
  174. }
  175. }
  176. private void initClientGlobal() throws Exception {
  177. ClientGlobal.init(this.getClass().getResource("/").getPath()+"deploy/fastdfs_client.conf");
  178. }
  179. public LinkedBlockingQueue<TrackerServer> getIdleConnectionPool() {
  180. return idleConnectionPool;
  181. }
  182. public long getMinPoolSize() {
  183. return minPoolSize;
  184. }
  185. public void setMinPoolSize(long minPoolSize) {
  186. if (minPoolSize != 0) {
  187. this.minPoolSize = minPoolSize;
  188. }
  189. }
  190. public long getMaxPoolSize() {
  191. return maxPoolSize;
  192. }
  193. public void setMaxPoolSize(long maxPoolSize) {
  194. if (maxPoolSize != 0) {
  195. this.maxPoolSize = maxPoolSize;
  196. }
  197. }
  198. public long getWaitTimes() {
  199. return waitTimes;
  200. }
  201. public void setWaitTimes(int waitTimes) {
  202. if (waitTimes != 0) {
  203. this.waitTimes = waitTimes;
  204. }
  205. }
  206. }

2.连接池定时器设置:

  1. /**
  2. * 连接池定时器设置
  3. * @version 1.0
  4. * @Description
  5. */
  6. public class HeartBeat {
  7. private static final Logger LOGGER = LoggerFactory
  8. .getLogger(HeartBeat.class);
  9. /**
  10. * fastdfs连接池
  11. */
  12. private ConnectionPool pool = null;
  13. /**
  14. * 小时毫秒数
  15. */
  16. public static int ahour = 1000 * 60 * 60 * 1;
  17. /**
  18. * 等待时间
  19. */
  20. public static int waitTimes = 1200;
  21. public HeartBeat(ConnectionPool pool) {
  22. this.pool = pool;
  23. }
  24. /**
  25. * @Description: 定时执行任务,检测当前的空闲连接是否可用,如果不可用将从连接池中移除
  26. */
  27. public void beat() {
  28. LOGGER.info("[心跳任务方法(beat)]");
  29. TimerTask task = new TimerTask() {
  30. @Override
  31. public void run() {
  32. String logId = UUID.randomUUID().toString();
  33. LOGGER.info("[心跳任务方法(beat)]["
  34. + logId
  35. + "][Description:对idleConnectionPool中的TrackerServer进行监测]");
  36. LinkedBlockingQueue<TrackerServer> idleConnectionPool = pool
  37. .getIdleConnectionPool();
  38. TrackerServer ts = null;
  39. for (int i = 0; i < idleConnectionPool.size(); i++) {
  40. try {
  41. ts = idleConnectionPool.poll(waitTimes,
  42. TimeUnit.SECONDS);
  43. if (ts != null) {
  44. org.csource.fastdfs.ProtoCommon.activeTest(ts
  45. .getSocket());
  46. idleConnectionPool.add(ts);
  47. } else {
  48. /** 代表已经没有空闲长连接 */
  49. break;
  50. }
  51. } catch (Exception e) {
  52. /** 发生异常,要删除,进行重建 */
  53. LOGGER.error("[心跳任务方法(beat)][" + logId
  54. + "][异常:当前连接已不可用将进行重新获取连接]");
  55. pool.drop(ts, logId);
  56. }
  57. }
  58. }
  59. };
  60. Timer timer = new Timer();
  61. timer.schedule(task, ahour, ahour);
  62. }
  63. }

3.工具类实现文件的上传下载:

  1. /**
  2. * fastdfs文件操作工具类 1).初始化连接池; 2).实现文件的上传与下载;
  3. * @version 1.0
  4. * @Description
  5. */
  6. public class FastDfsUtil {
  7. private static final Logger LOGGER = LoggerFactory
  8. .getLogger(FastDfsUtil.class);
  9. /**
  10. * 连接池
  11. */
  12. private static ConnectionPool connectionPool = null;
  13. /**
  14. * 连接池默认最小连接数
  15. */
  16. private long minPoolSize = 10;
  17. /**
  18. * 连接池默认最大连接数
  19. */
  20. private long maxPoolSize = 30;
  21. /**
  22. * 当前创建的连接数
  23. */
  24. private volatile long nowPoolSize = 0;
  25. /**
  26. * 默认等待时间(单位:秒)
  27. */
  28. private long waitTimes = 1200;
  29. /**
  30. * 初始化线程池
  31. *
  32. * @Description:
  33. */
  34. public void init() {
  35. String logId = UUID.randomUUID().toString();
  36. LOGGER.info("[初始化线程池(Init)][" + logId + "][默认参数:minPoolSize="
  37. + minPoolSize + ",maxPoolSize=" + maxPoolSize + ",waitTimes="
  38. + waitTimes + "]");
  39. connectionPool = new ConnectionPool(minPoolSize, maxPoolSize, waitTimes);
  40. }
  41. /**
  42. * @param groupName 组名如group0
  43. * @param fileBytes 文件字节数组
  44. * @param extName 文件扩展名:如png
  45. * @param linkUrl 访问地址:http://image.xxx.com
  46. * @return 图片上传成功后地址
  47. * @throws AppException
  48. * @Description: TODO(这里用一句话描述这个方法的作用)
  49. */
  50. public String upload(String groupName, byte[] fileBytes, String extName,
  51. String linkUrl) throws AppException {
  52. String logId = UUID.randomUUID().toString();
  53. /** 封装文件信息参数 */
  54. NameValuePair[] metaList = new NameValuePair[]{new NameValuePair(
  55. "fileName", "")};
  56. TrackerServer trackerServer = null;
  57. try {
  58. /** 获取fastdfs服务器连接 */
  59. trackerServer = connectionPool.checkout(logId);
  60. StorageServer storageServer = null;
  61. StorageClient1 client1 = new StorageClient1(trackerServer,
  62. storageServer);
  63. /** 以文件字节的方式上传 */
  64. String[] results = client1.upload_file(groupName, fileBytes, extName, metaList);
  65. /** 上传完毕及时释放连接 */
  66. connectionPool.checkin(trackerServer, logId);
  67. /** results[0]:组名,results[1]:远程文件名 */
  68. if (results != null && results.length == 2) {
  69. String result = results[0] + "/" + results[1];
  70. LOGGER.info("[上传文件(upload)-fastdfs服务器相应结果][" + logId
  71. + "][result:" + result + "]");
  72. if (!StringUtils.isEmpty(linkUrl))
  73. return linkUrl + "/" + result;
  74. else {
  75. return result;
  76. }
  77. } else {
  78. /** 文件系统上传返回结果错误 */
  79. throw ERRORS.UPLOAD_RESULT_ERROR.ERROR();
  80. }
  81. } catch (AppException e) {
  82. LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
  83. throw e;
  84. } catch (SocketTimeoutException e) {
  85. LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
  86. throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
  87. } catch (Exception e) {
  88. LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
  89. connectionPool.drop(trackerServer, logId);
  90. throw ERRORS.SYS_ERROR.ERROR();
  91. }
  92. }
  93. /**
  94. * @param groupName 组名如group0
  95. * @param fileBytes 文件字节数组
  96. * @param extName 文件扩展名:如png
  97. * @param linkUrl 访问地址:http://image.xxx.com
  98. * @return 图片上传成功后地址
  99. * @throws AppException
  100. * @Description: TODO(这里用一句话描述这个方法的作用)
  101. */
  102. public static String upload(byte[] fileBytes, String extName,
  103. String linkUrl) throws AppException {
  104. String logId = UUID.randomUUID().toString();
  105. /** 封装文件信息参数 */
  106. NameValuePair[] metaList = new NameValuePair[]{new NameValuePair(
  107. "fileName", "")};
  108. TrackerServer trackerServer = null;
  109. try {
  110. /** 获取fastdfs服务器连接 */
  111. trackerServer = connectionPool.checkout(logId);
  112. StorageServer storageServer = null;
  113. StorageClient1 client1 = new StorageClient1(trackerServer,
  114. storageServer);
  115. /** 以文件字节的方式上传 */
  116. String[] results = client1.upload_file(fileBytes,extName, metaList);
  117. /** 上传完毕及时释放连接 */
  118. connectionPool.checkin(trackerServer, logId);
  119. /** results[0]:组名,results[1]:远程文件名 */
  120. if (results != null && results.length == 2) {
  121. String result= results[0] + "/" + results[1];
  122. LOGGER.info("[上传文件(upload)-fastdfs服务器相应结果][" + logId
  123. + "][result:" + result + "]");
  124. if (!StringUtils.isEmpty(linkUrl))
  125. return linkUrl + "/" + result;
  126. else {
  127. return result;
  128. }
  129. } else {
  130. /** 文件系统上传返回结果错误 */
  131. throw ERRORS.UPLOAD_RESULT_ERROR.ERROR();
  132. }
  133. } catch (AppException e) {
  134. LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
  135. throw e;
  136. } catch (SocketTimeoutException e) {
  137. LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
  138. throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
  139. } catch (Exception e) {
  140. LOGGER.error("[上传文件(upload)][" + logId + "][异常:" + e + "]");
  141. connectionPool.drop(trackerServer, logId);
  142. throw ERRORS.SYS_ERROR.ERROR();
  143. }
  144. }
  145. /**
  146. * @param group_name 组名
  147. * @param remote_filename 远程文件名称
  148. * @throws AppException
  149. * @Description: 删除fastdfs服务器中文件
  150. */
  151. public void deleteFile(String group_name, String remote_filename)
  152. throws AppException {
  153. String logId = UUID.randomUUID().toString();
  154. LOGGER.info("[ 删除文件(deleteFile)][" + logId + "][parms:group_name="
  155. + group_name + ",remote_filename=" + remote_filename + "]");
  156. TrackerServer trackerServer = null;
  157. try {
  158. /** 获取可用的tracker,并创建存储server */
  159. trackerServer = connectionPool.checkout(logId);
  160. StorageServer storageServer = null;
  161. StorageClient1 client1 = new StorageClient1(trackerServer,
  162. storageServer);
  163. /** 删除文件,并释放 trackerServer */
  164. //int result = client1.delete_file(group_name, remote_filename);
  165. int result = client1.delete_file1(remote_filename);
  166. /** 上传完毕及时释放连接 */
  167. connectionPool.checkin(trackerServer, logId);
  168. LOGGER.info("[ 删除文件(deleteFile)--调用fastdfs客户端返回结果][" + logId
  169. + "][results:result=" + result + "]");
  170. //** 0:文件删除成功,2:文件不存在 ,其它:文件删除出错 *//*
  171. if (result == 2) {
  172. throw ERRORS.NOT_EXIST_FILE.ERROR();
  173. } else if (result != 0) {
  174. throw ERRORS.DELETE_RESULT_ERROR.ERROR();
  175. }
  176. } catch (AppException e) {
  177. LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
  178. throw e;
  179. } catch (SocketTimeoutException e) {
  180. LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
  181. throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
  182. } catch (Exception e) {
  183. LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
  184. connectionPool.drop(trackerServer, logId);
  185. throw ERRORS.SYS_ERROR.ERROR();
  186. }
  187. }
  188. /**
  189. * @param fileld 远程文件名称
  190. * @throws AppException
  191. * @Description: 删除fastdfs服务器中文件
  192. */
  193. public boolean delFile(String fileld)
  194. throws AppException {
  195. int count = 0;
  196. String logId = UUID.randomUUID().toString();
  197. LOGGER.info("[ 删除文件(deleteFile)][" + logId + "],remote_filename=" + fileld + "]");
  198. TrackerServer trackerServer = null;
  199. try {
  200. /** 获取可用的tracker,并创建存储server */
  201. trackerServer = connectionPool.checkout(logId);
  202. StorageServer storageServer = null;
  203. StorageClient1 client1 = new StorageClient1(trackerServer,
  204. storageServer);
  205. /** 删除文件,并释放 trackerServer */
  206. count = client1.delete_file1(fileld);
  207. /** 上传完毕及时释放连接 */
  208. connectionPool.checkin(trackerServer, logId);
  209. LOGGER.info("[ 删除文件(deleteFile)--调用fastdfs客户端返回结果][" + logId
  210. + "][results:result=" + count + "]");
  211. } catch (AppException e) {
  212. LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
  213. throw e;
  214. } catch (SocketTimeoutException e) {
  215. LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
  216. throw ERRORS.WAIT_IDLECONNECTION_TIMEOUT.ERROR();
  217. } catch (Exception e) {
  218. LOGGER.error("[ 删除文件(deleteFile)][" + logId + "][异常:" + e + "]");
  219. connectionPool.drop(trackerServer, logId);
  220. throw ERRORS.SYS_ERROR.ERROR();
  221. }
  222. return count == 0 ? true : false;
  223. }
  224. public ConnectionPool getConnectionPool() {
  225. return connectionPool;
  226. }
  227. public void setConnectionPool(ConnectionPool connectionPool) {
  228. this.connectionPool = connectionPool;
  229. }
  230. public long getMinPoolSize() {
  231. return minPoolSize;
  232. }
  233. public void setMinPoolSize(long minPoolSize) {
  234. this.minPoolSize = minPoolSize;
  235. }
  236. public long getMaxPoolSize() {
  237. return maxPoolSize;
  238. }
  239. public void setMaxPoolSize(long maxPoolSize) {
  240. this.maxPoolSize = maxPoolSize;
  241. }
  242. public long getNowPoolSize() {
  243. return nowPoolSize;
  244. }
  245. public void setNowPoolSize(long nowPoolSize) {
  246. this.nowPoolSize = nowPoolSize;
  247. }
  248. public long getWaitTimes() {
  249. return waitTimes;
  250. }
  251. public void setWaitTimes(long waitTimes) {
  252. this.waitTimes = waitTimes;
  253. }
  254. }

以上就是fastdfs连接池配置。查看日志:

对于fastDFS安装请参考:https://blog.csdn.net/u011663149/article/details/85321181


 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/169880
推荐阅读
相关标签
  

闽ICP备14008679号