赞
踩
一、简要概述
二、系统架构
Ceph 系统架构可以划分为以下四部分:
Ceph的生态系统的概念架构如下图:
三、代码实现
<!-- CEPH amazonaws-sdk start--> <!-- CEPH start--> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-core</artifactId> <version>${com.amazonaws-version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </exclusion> </exclusions> <groupId>io.minio</groupId> <artifactId>minio</artifactId> <version>7.1.0</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-s3</artifactId> <version>${com.amazonaws-version}</version> </dependency> <dependency> <groupId>org.jetbrains</groupId> <artifactId>annotations</artifactId> <version>RELEASE</version> </dependency> <!-- CEPH amazonaws-sdk end--> <!-- CEPH end-->
#ceph config
ceph.accessKey=xxxxxx(S3服务中帐户的访问密钥,又称为用户ID)
ceph.secretKey=xxxxxx(秘密密钥,又称为密码)
ceph.domain.ip=xxxxxx(接受端点,又称为okhttp3.HttpUrl对象)
<!-- 将多个配置文件读取到容器中,交给Spring管理 -->
<bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="locations">
<list>
<!-- 这里支持多种寻址方式:classpath和file -->
<value>classpath:/properties/ceph.properties</value>
<value>classpath:/properties/jdbc.properties</value>
<value>classpath:/properties/redis.properties</value>
<value>classpath:/properties/xxl-job.properties</value>
</list>
</property>
</bean>
<bean class="org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer">
<property name="properties" ref="configProperties" />
</bean>
public class CephUtils { private static Logger log = LoggerFactory.getLogger(CephUtils.class); public void CephUtils() { } /** * 【你的 access_key】 */ private static final String AWS_ACCESS_KEY = "XH084XXXXA3Y0EZT2CX"; /** * 【你的 aws_secret_key】 */ private static final String AWS_SECRET_KEY = "rJ4Xs9wACXXXXXognDwEP31KmUzv1vV9M24BWT88"; /** * 【你的 endpoint】 */ private static final String ENDPOINT = "http://127.0.0.1:7480"; private static AmazonS3 conn; /** * 静态块:初始化S3的连接对象AmazonS3! 需要3个参数:AWS_ACCESS_KEY,AWS_SECRET_KEY */ static { InputStream inputStream = getResourceAsStream("properties/ceph.properties"); Properties p = new Properties(); try { p.load(inputStream); } catch (IOException e1) { e1.printStackTrace(); } AWSCredentials awsCredentials = new BasicAWSCredentials(p.getProperty("ceph.accessKey"), p.getProperty("ceph.secretKey")); ClientConfiguration clientConfig = new ClientConfiguration(); clientConfig.setProtocol(Protocol.HTTP); conn = new AmazonS3Client(awsCredentials, clientConfig); conn.setEndpoint(p.getProperty("ceph.domain.ip")); } /** * 获取ceph的所有列表 * * @return */ public static List<Bucket> getBucketList() { List<Bucket> buckets = conn.listBuckets(); for (Bucket bucket : buckets) { System.out.println(bucket.getName() + "\t" + StringUtils.fromDate(bucket.getCreationDate())); } return buckets; } /** * 获取ceph的当天桶名称是否已创建 * * @return */ public static boolean getBucketIsCreated() { List<Bucket> buckets = conn.listBuckets(); for (Bucket bucket : buckets) { if (("bst-" + DateUtil.getCurrentDate("yyyyMMdd")).equals(bucket.getName())) { return true; } } return false; } /** * 获取ceph当前日期的桶名称 * * @return bucket.getName() */ public static String getCurrentDateBucketName() { List<Bucket> buckets = conn.listBuckets(); for (Bucket bucket : buckets) { if (("bst-" + DateUtil.getCurrentDate("yyyyMMdd")).equals(bucket.getName())) { return bucket.getName(); } } return createBucket(); } /** * 创建对象的bucket * * @param bucketName * @return */ public static ObjectListing getObjectListing(String bucketName) { Bucket bucket = conn.createBucket(bucketName); ObjectListing objects = conn.listObjects(bucket.getName()); do { for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) { System.out.println(objectSummary.getKey() + "\t" + objectSummary.getSize() + "\t" + StringUtils.fromDate(objectSummary.getLastModified())); } objects = conn.listNextBatchOfObjects(objects); } while (objects.isTruncated()); return objects; } /** * 创建对象bucket * * @param */ public static String createBucket() { String bucketName = "bst-" + DateUtil.getCurrentDate("yyyyMMdd"); Bucket bucket = conn.createBucket(bucketName); System.out.println(JSON.toJSONString(bucket)); return bucketName; } /** * 删除对象bucket * * @param bucketName */ public static void deleteBucket(String bucketName) { conn.deleteBucket(bucketName); } /** * 上传字符串生成文件 * * @param bucketName * @param fileName * @param text */ public static void uploadStream(String bucketName, String fileName, String text) { ByteArrayInputStream input = new ByteArrayInputStream(text.getBytes()); PutObjectResult putObjectResult = conn.putObject(bucketName, fileName, input, new ObjectMetadata()); System.out.println(JSON.toJSONString(putObjectResult)); } /** * 修改文件权限 public * * @param bucketName * @param fileName */ public static void modifyPub(String bucketName, String fileName) { conn.setObjectAcl(bucketName, fileName, CannedAccessControlList.PublicRead); } /** * 下载 * * @param bucketName * @param keyName * @param dirName */ public static void downloadFile(String bucketName, String keyName, String dirName) { conn.getObject( new GetObjectRequest(bucketName, keyName), new File(dirName) ); } /** * 删除文件 */ public static void deleteObject(String bucketName, String fileName) { conn.deleteObject(bucketName, fileName); } /** * 获取下载url 生成对象的下载 URLS (带签名和不带签名) * * @param bucketName * @param keyName * @return */ public static URL geturl(String bucketName, String keyName) { GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, keyName); return conn.generatePresignedUrl(request); } /** * 上传文件返回url * * @param bucketName * @param file * @param keyName * @return */ public static URL uploadFileToUrl(String bucketName, File file, String keyName) { try { PutObjectRequest request = new PutObjectRequest(bucketName, keyName, file); conn.putObject(request); } catch (Exception e) { log.info("上传文件异常:{}", e); } GeneratePresignedUrlRequest requests = new GeneratePresignedUrlRequest(bucketName, keyName); return conn.generatePresignedUrl(requests); } /** * 上传InputStream文件 * * @param bucketName * @param fileName * @param input */ public static void uploadInputStream(String bucketName, String fileName, InputStream input) { PutObjectResult putObjectResult = conn.putObject(bucketName, fileName, input, new ObjectMetadata()); System.out.println(JSON.toJSONString(putObjectResult)); } /** * 上传文件字节流到ceph * * @param bucketName * @param fileName * @param contents */ public static void uploadByte(String bucketName, String fileName, byte[] contents) { try (ByteArrayInputStream input = new ByteArrayInputStream(contents)) { PutObjectResult putObjectResult = conn.putObject(bucketName, fileName, input, new ObjectMetadata()); System.out.println(JSON.toJSONString(putObjectResult)); ObjectListing objects = conn.listObjects(bucketName); for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) { System.out.println("====" + objectSummary.getKey()); } } catch (IOException e) { log.error(e.getMessage(), e); throw new IllegalStateException("文件上传到阿里云OSS服务报错!", e); } } /** * 从ceph系统上下载流对象 * * @param bucketName * @param fileName1 */ public static InputStream readStreamObject(String bucketName, String fileName) { GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, fileName); S3Object object = conn.getObject(getObjectRequest); System.out.println("Content-Type:" + object.getObjectMetadata().getContentType()); return object.getObjectContent(); } }
@RequestMapping(value = "/ceph/test", method = RequestMethod.POST, produces = {"application/json;charset=utf-8"}) @ResponseBody public Map<String, Object> cephTest(@RequestBody String jsonStr) { //删除测试时所创建的桶名称 /*String oldBucketName = "my-new-bucket"; CephUtils.deleteBucket(oldBucketName);*/ String uuid = UUID.randomUUID().toString(); //String fileName = "message_" + uuid + ".txt"; uuid = uuid.replace("-", ""); String fileName = uuid; //先查看远程ceph系统是否存在当前日期的桶名称 bst-yyyymmdd boolean isCreated = CephUtils.getBucketIsCreated(); //如果返回true,证明当天的桶名称已存在;否则创建 if (isCreated) { String bucketName = CephUtils.getCurrentDateBucketName(); CephUtils.uploadByte(bucketName, fileName, jsonStr.getBytes()); } else { //新创建桶名称 CephUtils.createBucket(); String bucketName = CephUtils.getCurrentDateBucketName(); CephUtils.uploadByte(bucketName, fileName, jsonStr.getBytes()); } Map<String, Object> map = new HashMap<>(); map.put("message", "成功=" + fileName); map.put("message", "成功=" + bucketName); return map; }
四、总结
1. 以上就是最近在公司开发项目中用Ceph分布式文件系统来存储相关文件内容,在此过程中通过java代码来实现连接远端Ceph服务器。
2. 如果大家有需要使用Ceph文件系统进行相关操作,欢迎评论区提问,我会尽其所能的帮助大家。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。