使用RabbitMQ实现异步上传
目录
- 使用RabbitMQ实现异步上传
- mq异步模式
- rabbitmq介绍
- 安装rabbitmq
- 打开管理后台
- 代码演示
-
mq异步模式
rabbitmq介绍
安装rabbitmq
# 创建数据目录
mkdir /data/rabbitmq
# 启动mq
docker run -d --hostname rabbit-svr --name rabbit -p 5672:5672 -p 15672:15672 -p 25672:25672 -v /data/rabbitmq:/var/lib/rabbitmq rabbitmq:management
打开管理后台
http://ip:15672
账号: guest
密码: guest
代码演示
package mq
import "github.com/jyyds/filestore/common"
// 转移队列中消息载体的结构体格式
type TransferData struct {
FileHash string
CurLocation string
DestLocation string
DestStoreType common.StoreType
}
生产者
package mq
import (
"log"
"github.com/jyyds/filestore/config"
"github.com/streadway/amqp"
)
var conn *amqp.Connection
var channel *amqp.Channel
func initChannel() bool {
// 1.判断channel是否已经创建过
if channel != nil {
return true
}
// 2.获取rabbitmq的一个连接
conn, err := amqp.Dial(config.RabbitURL)
if err != nil {
log.Println(err.Error())
return false
}
// 3.打开一个channel,用于消息的发布与接收
channel, err = conn.Channel()
if err != nil {
log.Println(err.Error())
return false
}
return true
}
// 发布消息
func Puublish(exchange, routingKey string, msg []byte) bool {
// 1.判断channel是否正常
if !initChannel() {
return false
}
// 2.执行消息发布动作
err := channel.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: msg,
},
)
if err != nil {
log.Println(err.Error())
return false
}
return true
}
data := mq.TransferData{
FileHash: fileMeta.FileSha1,
CurLocation: fileMeta.Location,
DestLocation: ossPath,
DestStoreType: cmn.StoreOSS,
}
pubData, _ := json.Marshal(data)
suc := mq.Puublish(
cfg.TransExchangeName,
cfg.TransOSSRoutingKey,
pubData,
)
消费者
package mq
import "log"
var done chan bool
// 开始监听队列,获取信息
func StartConsume(qName, cName string, callBack func(msg []byte) bool) {
// 1. 用过channel.Consume获取消息信道
msgs, err := channel.Consume(
qName,
cName,
true,
false,
false,
false,
nil,
)
if err != nil {
log.Println(err.Error())
return
}
// 2. 循环获取队列的消息
go func() {
for msg := range msgs {
// 3.调用callback方法处理新的消息
procssSuc := callBack(msg.Body)
if !procssSuc {
// TODO:将任务写到另外一个队列,用于异常情况的重试
}
}
}()
// done没有新的消息过来,会一直发生阻塞
<-done
// 关闭rabbitmq
channel.Close()
}
package main
import (
"bufio"
"encoding/json"
"log"
"os"
"github.com/jyyds/filestore/config"
dblayer "github.com/jyyds/filestore/db"
"github.com/jyyds/filestore/mq"
"github.com/jyyds/filestore/store/oss"
)
func ProcessTransfer(msg []byte) bool {
// 1. 解析msg
pubData := mq.TransferData{}
err := json.Unmarshal(msg, pubData)
if err != nil {
log.Println(err.Error())
return false
}
// 2. 根据临时存储文件路径,创建文件句柄
filed, err := os.Open(pubData.CurLocation)
if err != nil {
log.Println(err.Error())
return false
}
// 3. 通过文件句柄将文件内容读出来并且上传到OSS
err = oss.Bucket().PutObject(
pubData.DestLocation,
bufio.NewReader(filed),
)
if err != nil {
log.Println(err.Error())
return false
}
// 4. 更新文件的存储路径到文件表
suc := dblayer.UpdateFileLocation(
pubData.FileHash,
pubData.DestLocation,
)
if !suc {
log.Println(err.Error())
return false
}
return true
}
func main() {
log.Println("开始监听转移任务队列...")
mq.StartConsume(
config.TransOSSQueueName,
"transfer_oss",
ProcessTransfer,
)
}