栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

go实现rabbitmq简单模式demo

Java 更新时间:发布时间: 百科书网 趣学号

文章目录
  • 前言
  • 项目目录
  • 代码
  • 测试结果

前言

最近在学习go语言,在网上学了下go使用rabbitmq,顺便记录下自己的学习的代码,下面直接贴代码
主要参考这篇文章参考的文章

项目目录

使用idea创建一个go module项目,demo目录结构如下

代码

RabbitMQ

package rabbitmq

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

const MQURL = "amqp://cchuser:cchuser@localhost:5672/cch"

type RabbitMQ struct {
	conn      *amqp.Connection
	channel   *amqp.Channel
	queueName string
	exchange  string
	key       string
	mqurl     string
}

func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {

	rabbitMQ := &RabbitMQ{
		queueName: queueName,
		exchange:  exchange,
		key:       key,
		mqurl:     MQURL,
	}

	dial, err := amqp.Dial(rabbitMQ.mqurl)
	rabbitMQ.failOnErr(err, "创建连接失败")
	rabbitMQ.conn = dial

	rabbitMQ.channel, err = rabbitMQ.conn.Channel()
	rabbitMQ.failOnErr(err, "获取通道失败")

	return rabbitMQ
}

func (r *RabbitMQ) failOnErr(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%sn", err, message)
		panic(fmt.Sprintf("%s:%sn", err, message))
	}
}

func (r *RabbitMQ) destory() {
	r.channel.Close()
	r.conn.Close()
}

func NewSimpleRabbitMQ(queueName string) *RabbitMQ {

	return NewRabbitMQ(queueName, "", "")
}

func (r *RabbitMQ) Publish(message string) {

	_, err := r.channel.QueueDeclare(
		r.queueName,
		//是否持久化
		false,
		//是否为自动删除
		false,
		//是否具有排他性
		false,
		//是否阻塞
		false,
		//额外属性
		nil,
	)
	if err != nil {
		panic(err)
	}

	r.channel.Publish(
		r.exchange,
		r.queueName,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})

}

func (r *RabbitMQ) Consumer() {

	_, err := r.channel.QueueDeclare(r.queueName, false, false, false, false, nil)

	if err != nil {
		panic(err)
	}

	//2、接收消息
	msgs, err := r.channel.Consume(
		r.queueName,
		//用来区分多个消费者
		"",
		//是否自动应答
		true,
		//是否具有排他性
		false,
		//如果设置为true,表示不能将同一个connection中发送的消息传递给这个connection中的消费者
		false,
		//消息队列是否阻塞
		false,
		nil,
	)
	if err != nil {
		panic(err)
	}

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message : %s", d.Body)
		}
	}()

	log.Printf("[*] Waiting for messagees,To exit press CTRL+C")

	<-forever

}

producer

package main

import (
	"fmt"
	"rbbitmq_demo/mq2/rabbitmq"
	"strconv"
	"time"
)

func main() {

	rabbitMQ := rabbitmq.NewSimpleRabbitMQ("test-one")

	for i := 0; i < 100000; i++ {
		time.Sleep(100 * time.Millisecond)
		rabbitMQ.Publish("新消息 " + strconv.Itoa(i))
		fmt.Println("发送成功")
	}

}

consumer

package main

import "rbbitmq_demo/mq2/rabbitmq"

func main() {

	rabbitMQ := rabbitmq.NewSimpleRabbitMQ("test-one")

	rabbitMQ.Consumer()
}
测试结果

先启动consumer消费者,再启动生产者,可以看到消费者成功消费到消息

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/275230.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号