栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

2021-09-29

Java 更新时间:发布时间: 百科书网 趣学号
Centos7 ceph集群安装 cephfs客户端挂载 java代码调用
  1. ceph简介
    ceph分为ceph monitor,ceph manager(管理器),ceph ODS(对象存储守护进程),ceph MDS(元数据服务器)用于存储文件

  2. ceph安装部署
    2.1部署环境

IP安装服务内核主机名
192.168.198.129admin-notecentos7admin-note
192.168.198.130osdcentos7note2
192.168.198.131osdcentos7note3
192.168.198.132osd,mdscentos7note4
2.2关闭selinux和防火墙(所有节点)
#关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
#关闭selinux
setenforce 0
sed -i '/SELINUX/s/enforcing/disabled/' /etc/selinux/config
2.3设置时间同步(所有节点)
# yum 安装 ntp
yum install ntp ntpdate ntp-doc
# 校对系统时钟
ntpdate 0.cn.pool.ntp.org
2.3修改主机名和hosts文件(所有节点)
#192.168.198.129上操作
hostnamectl set-hostname admin-node
#192.168.198.130上操作
hostnamectl set-hostname node2
#192.168.198.131上操作
hostnamectl set-hostname node3
#192.168.198.132上操作
hostnamectl set-hostname node4
#所有节点上操作
vi /etc/hosts
#将以下内容增加到/etc/hosts中
192.168.198.129  admin-node
192.168.198.130  node2
192.168.198.131  node3
192.168.198.132  node4
2.4创建 Ceph 部署用户并设置其远程登录密码(所有节点)
# 创建 ceph 特定用户
useradd -d /home/cephd -m cephd
echo cephd | passwd cephd --stdin
# 添加 sudo 权限
echo "cephd ALL = (root) NOPASSWD:ALL" | sudo tee /etc/sudoers.d/cephd
chmod 0440 /etc/sudoers.d/cephd
# node1、node2、node3上操作,设置cephd用户的远程登录密码,记住此密码,后面配置免密登录会用到
passwd cephd
2.5配置免密登录(admin-node节点)
# 切换到cephd用户
su cephd
# 生成ssh密钥,一路回车即可
ssh-keygen
# 将公钥复制到 node1 节点,输入cephd的用户密码即可
ssh-copy-id node1
# 将公钥复制到 node2 节点,输入cephd的用户密码即可
ssh-copy-id node2
# 将公钥复制到 node3 节点,输入cephd的用户密码即可
ssh-copy-id node3
2.6Ceph集群搭建(ceph version 10.2.11)(admin-note)
配置ceph源
# yum 配置其他依赖包
sudo yum install -y yum-utils && sudo yum-config-manager --add-repo https://dl.fedoraproject.org/pub/epel/7/x86_64/ && sudo yum install --nogpgcheck -y epel-release && sudo rpm --import /etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-7 && rm /etc/yum.repos.d/dl.fedoraproject.org*
# 添加 Ceph 源
sudo vi /etc/yum.repos.d/ceph.repo
[ceph]
name=ceph
baseurl=http://mirrors.aliyun.com/ceph/rpm-jewel/el7/x86_64/
gpgcheck=0
[ceph-noarch]
name=cephnoarch
baseurl=http://mirrors.aliyun.com/ceph/rpm-jewel/el7/noarch/
gpgcheck=0
安装
sudo yum update && sudo yum install ceph-deploy
2.7修改 ceph-deploy 管理节点上的 ~/.ssh/config 文件,这样无需每次执行 ceph-deploy 都要指定 –username cephd(admin-node)
sudo vi ~/.ssh/config
Host admin-note
   Hostname admin-note
   User cephd
Host note2
   Hostname note2
   User cephd
Host note3
   Hostname note3
   User cephd
Host note4
   Hostname note4
   User cephd
sudo chmod 600 ~/.ssh/config
2.8创建集群 (admin-note)
	(1)创建集群(在管理节点master上)
# 创建执行目录
mkdir /home/cephd/ceph-cluster && cd  /home/cephd/ceph-cluster
# 创建集群 mon节点
sudo ceph-deploy new admin-note
	(2)把 Ceph 配置文件里的默认副本数从 3 改成 2 ,这样只有两个 OSD 也可以达到 active + clean 状态
vim /home/ceph/my-cluster/ceph.conf 添加如下内容:
osd pool default size = 2
	(3)安装ceph(admin-node)
ceph-deploy install admin-note,note2,note3
	(4)初始化 monitor 节点并收集所有密钥(admin-node)
sudo ceph-deploy --overwrite-conf mon create-initial
	(5)添加osd进程
	添加两个 OSD 。为了快速地安装,这篇快速入门把目录而非整个硬盘用于 OSD 守护进程。登录到 Ceph 节点、并给 OSD 守护进程创建一个目录。在这里插入代码片
创建osd0的工作目录   (在note2节点上)
sudo mkdir /var/local/osd0
创建osd1的工作目录   (在note3节点上)
sudo mkdir /var/local/osd1
准备OSD      (在admin-note节点上)
ceph-deploy osd prepare slave1:/var/local/osd0 slave2:/var/local/osd1
激活OSD      (master节点)
ceph-deploy osd activate note2:/var/local/osd0 note3:/var/local/osd1
	(6)用 ceph-deploy 把配置文件和 admin 密钥拷贝到管理节点和 Ceph 节点,这样你每次执行 Ceph 命令行时就无需指定 monitor 地址和 ceph.client.admin.keyring 了  (master节点执行)
ceph-deploy admin master slave1 slave2
#添加操作权限
sudo chmod +r /etc/ceph/ceph.client.admin.keyring
#检查健康状态
ceph health
  1. cephfs部署挂载
    3.1部署
    (1)安装mds(admin-note)
ceph-deploy mds create note2
	(2)创建储存池(note2)
ceph osd pool create cephfs_data 128
pool 'cephfs_data' created
ceph osd pool create cephfs_metadata 128
pool 'cephfs_metadata' created
#使用fs new命令enable 文件系统
ceph fs new cephfs cephfs_metadata cephfs_data
#查看cephfs信息
ceph fs ls
#查看cephfs状态
ceph mds stat
3.2挂载cephfs
Cephfs有两种挂载方式:kernel或fuse
(1)kernel内核挂载
#创建挂载目录
mkdir /mnt/mycephfs  
#创建保存admin密钥文件
vi admin.secret
#挂载
mount -t ceph 192.168.198.129:6789:/ /mnt/mycephfs -o name=admin,secretfile=/root/admin.secret  
#查看挂载
 df -Th | grep cephfs
(2)fuse挂载
#先取消之前的挂载
umount /mnt/mycephfs/
#下载工具包
yum -y install ceph-fuse
mkdir ~/mycephfs
ceph-fuse -k /etc/ceph/ceph.client.admin.keyring -m 192.168.198.129:6789 ~/mycephfs/
(3)设置开机自动挂载/etc/fstab
id=admin,conf=/etc/ceph/ceph.conf  /mnt fuse.ceph defaults 0 0
  1. java代码调用

(1)maven引入jar包


            com.ceph
            libcephfs
            0.80.5
        
        
        
            com.ceph
            rados
            0.3.0
        
package com.ctsi.config;

import com.ceph.fs.CephFileExtent;
import com.ceph.fs.CephMount;
import com.ceph.fs.CephStat;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.*;
import java.util.Arrays;
public class CephOperateConfig {
    private  CephMount mount;
    private  String username;
    private String monIp;
    private String userKey;
    public CephOperateConfig(String username, String monIp, String userKey, String mountPath) {
        this.username = username;
        this.monIp = monIp;
        this.userKey = userKey;
        this.mount = new CephMount(username);
        this.mount.conf_set("mon_host", monIp);
        mount.conf_set("key",userKey);
        mount.mount(mountPath);
    }

    @SneakyThrows
    public static void main(String[] args) {
        String name="client.devops-test";
        String key="AQD6a8deA0aFKxAAGJRBBnHFCikG7hIxHUbYVA==";
        String ip="10.2.2.6,10.2.2.7,10.2.2.8,10.2.2.9,10.2.2.10";
        String path="/data/cephfs/blockchain/";
        CephOperateConfig cephOperateConfig=new CephOperateConfig(name,ip,key,path);
        cephOperateConfig.listDir(path);
    }

    //查看目录列表
    public void listDir(String path) throws IOException {
        String[] dirs = mount.listdir(path);
        System.out.println("contents of the dir: " + Arrays.asList(dirs));
    }

    //新建目录
    public void mkDir(String path) throws IOException {
        mount.mkdirs(path,0755);//0表示十进制
    }

    //删除目录
    public void delDir(String path) throws IOException {
        mount.rmdir(path);
    }

    //重命名目录or文件
    public void renameDir(String oldName, String newName) throws IOException {
        mount.rename(oldName, newName);
    }

    //删除文件
    public void delFile(String path) throws IOException {
        mount.unlink(path);
    }

    //读文件

    public void readFile(String path) {
        System.out.println("start read file...");
        int fd = -1;
        try{
            fd = mount.open(path, CephMount.O_RDWR, 0755);

            System.out.println("file fd is : " + fd);

            byte[] buf = new byte[1024];
            long size = 10;
            long offset = 0;
            long count = 0;
            while((count = mount.read(fd, buf, size, offset)) > 0){
                for(int i = 0; i < count; i++){
                    System.out.print((char)buf[i]);
                }
                offset += count;
            }

        } catch (IOException e){
            e.printStackTrace();
        } finally {
            if(fd > 0){
                mount.close(fd);
            }
        }
    }

    //复制文件

    public void copyFile(String sourceFile, String targetFile){
        System.out.println("start write file...");
        int readFD = -1, createAA = -1, writeFD = -1;
        try{
            readFD = mount.open(sourceFile, CephMount.O_RDWR, 0755);
            writeFD = mount.open(targetFile, CephMount.O_RDWR | CephMount.O_CREAT, 0644);
//                createAA = mountLucy.open("aa.txt", CephMount.O_RDWR | CephMount.O_CREAT | CephMount.O_EXCL, 0644);//若文件已有, 会异常
            System.out.println("file read fd is : " + readFD);

            byte[] buf = new byte[1024];
            long size = 10;
            long offset = 0;
            long count = 0;
            while((count = mount.read(readFD, buf, size, -1)) > 0){
                mount.write(writeFD, buf, count, -1);//-1指针跟着走,若取值count,指针不动
                System.out.println("offset: " + offset);
                offset += count;
                System.out.println("writeFD position : " + mount.lseek(writeFD, 0, CephMount.SEEK_CUR));
            }

        } catch (IOException e){
            e.printStackTrace();
        } finally {
            if(readFD > 0){
                mount.close(readFD);
            }
            if(writeFD > 0){
                mount.close(writeFD);
            }
        }
    }

    //写文件

    public void writeFileWithLseek(String path, long offset, int type){
        if(type <= 0){
            type =CephMount.SEEK_CUR;
        }
        System.out.println("start write file...");
        int writeFD = -1;
        try{
            writeFD = mount.open(path, CephMount.O_RDWR | CephMount.O_APPEND, 0644);
            long pos = mount.lseek(writeFD, offset, type);
            System.out.println("pos : " + pos);
            String msg = " asdfasdfasdf123123123 n";
            byte[] buf = msg.getBytes();
            mount.write(writeFD, buf, buf.length, pos);

        } catch (IOException e){
            e.printStackTrace();
        } finally {
            if(writeFD > 0){
                mount.close(writeFD);
            }
        }
    }

    // 判断是目录还是文件

    public void listFileOrDir(){
        int writeFD = -1;
        try{
            String[] lucyDir = mount.listdir("/");
            for(int i = 0; i < lucyDir.length; i++){
                CephStat cephStat = new CephStat();
                mount.lstat(lucyDir[i], cephStat);
                System.out.println(lucyDir[i] + " is dir : " + cephStat.isDir()
                        + " is file: " + cephStat.isFile()
                        + " size: " + cephStat.size
                        + " blksize: " + cephStat.blksize);//cephStat.size就是文件大小
            }

            writeFD = mount.open("lucy1.txt", CephMount.O_RDWR | CephMount.O_APPEND, 0644);
            CephFileExtent cephFileExtent = mount.get_file_extent(writeFD, 0);
            System.out.println("lucy1.txt size: " + cephFileExtent.getLength());//4M
            System.out.println("lucy1.txt stripe unit: " + mount.get_file_stripe_unit(writeFD));//4M
            long pos = mount.lseek(writeFD, 0, CephMount.SEEK_END);
            System.out.println("lucy1.txt true size: " + pos);//30Byte

        } catch (IOException e){
            e.printStackTrace();
        } finally {
            if(writeFD > 0){
                mount.close(writeFD);
            }
        }
    }

    //set current dir (work dir)

    public void setWorkDir(String path) throws IOException {
        mount.chdir(path);
    }


    //外部获取mount

    public CephMount getMount(){
        return this.mount;
    }

    //umount

    public void umount(){
        mount.unmount();
    }


    public Boolean uploadFileByPath(String filePath, String fileName){

        // exit with null if not mount
        if (this.mount == null){

            return null;
        }

        // file definition
        char pathChar = File.separatorChar;
        String fileFullName = "";
        Long fileLength = 0l;
        Long uploadedLength = 0l;
        File file = null;

        // Io
        FileInputStream fis = null;

        // get local file info
        fileFullName = filePath + pathChar + fileName;
        file = new File(fileFullName);
        if (!file.exists()){

            return false;
        }
        fileLength = file.length();

        // get io from local file
        try {
            fis = new FileInputStream(file);
        }catch (FileNotFoundException e){

            e.printStackTrace();
        }

        // if file exists or not
        String[] dirList = null;
        Boolean fileExist = false;
        try {
            dirList = this.mount.listdir("/");
            for (String fileInfo : dirList){
                if (fileInfo.equals(fileName)){
                    fileExist = true;
                }
            }
        }catch (FileNotFoundException e){

            e.printStackTrace();
        }

        // transfer file by diff pattern
        if (!fileExist){
            try {
                // create file and set mode WRITE
                this.mount.open(fileName, CephMount.O_CREAT, 0);
                int fd = this.mount.open(fileName, CephMount.O_RDWR, 0);

                // start transfer
                int length = 0;
                byte[] bytes = new byte[1024];
                while ((length = fis.read(bytes, 0, bytes.length)) != -1){
                    // write
                    this.mount.write(fd, bytes, length, uploadedLength);

                    // update length
                    uploadedLength += length;

                    // output transfer rate
                    float rate = (float)uploadedLength * 100 / (float)fileLength;
                    String ratevalue = (int)rate + "%";
                    System.out.println(ratevalue);

                    // complete flag
                    if (uploadedLength == fileLength){
                        break;
                    }
                }
                System.out.println("文件传输成功!");

                // chmod
                this.mount.fchmod(fd, 0666);

                // close
                this.mount.close(fd);
                if (fis != null){
                    fis.close();
                }
                return true;
            }catch (Exception e){

                e.printStackTrace();
            }
        }else if (fileExist){
            try {
                // get file length
                CephStat stat = new CephStat();
                this.mount.stat(fileName, stat);
                uploadedLength = stat.size;
                int fd = this.mount.open(fileName, CephMount.O_RDWR, 0);

                // start transfer
                int length = 0;
                byte[] bytes = new byte[1024];
                fis.skip(uploadedLength);
                while ((length = fis.read(bytes, 0, bytes.length)) != -1){
                    // write
                    this.mount.write(fd, bytes, length, uploadedLength);

                    // update length
                    uploadedLength += length;

                    // output transfer rate
                    float rate = (float)uploadedLength * 100 / (float)fileLength;
                    String ratevalue = (int)rate + "%";
                    System.out.println(ratevalue);

                    // complete flag
                    if (uploadedLength == fileLength){
                        break;
                    }
                }
                System.out.println("断点文件传输成功!");

                // chmod
                this.mount.fchmod(fd, 0666);

                // close
                this.mount.close(fd);
                if (fis != null){
                    fis.close();
                }
                return true;
            }catch (Exception e){

                e.printStackTrace();
            }
        }else {
            try {
                if (fis != null){
                    fis.close();
                }
            }catch (Exception e){
                e.printStackTrace();
            }
            return false;
        }

        return false;
    }


    //文件下载
    public Boolean downLoadFileByPath(String filePath,String fileName){
        char pathChar= File.separatorChar;
        String fileFullName="";
        Long fileLength=0L;
        Long downloadedLength=0L;
        File file=null;

        //IO
        FileOutputStream fos=null;
        RandomAccessFile raf=null;

        //new file object
        fileFullName=filePath+pathChar+fileName;
        file=new File(fileFullName);

        //get cephfs file size
        try{
            CephStat stat=new CephStat();
            mount.stat(fileName,stat);
            fileLength=stat.size;
        }catch (Exception e){

            e.printStackTrace();
        }
        if(fileLength!=0){
            if(!file.exists()){
                int length=10240;
                byte[] bytes=new byte[length];
                try {
                    int fd=mount.open(fileName,CephMount.O_RDONLY,0);
                    fos=new FileOutputStream(file);
                    float rate=0;
                    String ratevalue="";
                    while ((fileLength-downloadedLength)>=length&&(mount.read(fd,bytes,(long)length,downloadedLength))!=-1){
                        fos.write(bytes,0,length);
                        fos.flush();
                        downloadedLength+=(long)length;
                        rate=(float)downloadedLength*100/(float)fileLength;
                        ratevalue=(int)rate+"%";
                        System.out.println(ratevalue);
                        if(downloadedLength.equals(fileLength)){
                            break;
                        }
                    }
                    if(!downloadedLength.equals(fileLength)){
                        mount.read(fd,bytes,fileLength-downloadedLength,downloadedLength);
                        fos.write(bytes,0,(int)(fileLength-downloadedLength));
                        fos.flush();
                        downloadedLength=fileLength;
                        rate=(float)downloadedLength*100/(float)fileLength;
                        ratevalue=(int)rate+"%";
                        System.out.println(ratevalue);
                    }
                    System.out.println("Download Successs");
                    fos.close();
                    mount.close(fd);
                    return true;
                } catch (Exception e) {
                    System.out.println("download fail");
                    e.printStackTrace();
                }
            }else if(file.exists()){
                int length=10240;
                byte[] bytes=new byte[length];
                Long filePoint=file.length();
                try {
                    int fd=mount.open(fileName,CephMount.O_RDONLY,0);
                    raf=new RandomAccessFile(file,"rw");
                    raf.seek(filePoint);
                    float rate=0;
                    String ratevalue="";
                    while ((fileLength-downloadedLength)>=length&&(mount.read(fd,bytes,(long)length,downloadedLength))!=-1){
                        raf.write(bytes,0,length);
                        downloadedLength+=(long)length;
                        rate=(float)downloadedLength*100/(float)fileLength;
                        ratevalue=(int)rate+"%";
                        System.out.println(ratevalue);
                        if(downloadedLength.equals(fileLength)){
                            break;
                        }
                    }
                    if(!downloadedLength.equals(fileLength)){
                        mount.read(fd,bytes,fileLength-downloadedLength,downloadedLength);
                        raf.write(bytes,0,(int)(fileLength-downloadedLength));
                        downloadedLength=fileLength;
                        rate=(float)downloadedLength*100/(float)fileLength;
                        ratevalue=(int)rate+"%";
                        System.out.println(ratevalue);
                    }
                    System.out.println("Download Successs");
                    raf.close();
                    mount.close(fd);
                    return true;
                } catch (Exception e) {
                    System.out.println("download fail");
                    e.printStackTrace();
                }
            }
        }else{
           System.out.println("err");
            return false;
        }

        return null;
    }

}

参考资料
添加链接描述ceph官网

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/276442.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号