栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

springcloudstream+rabbitmq+eureka进行消息发送和接收实例代码

大数据系统 更新时间:发布时间: 百科书网 趣学号

文章目录
  • eureka作注册中心的配置:
  • 消息提供方:
  • 消费者代码

注册中心、消息接受者、消息提供者分别启动:

eureka作注册中心的配置:

依赖包:

  
        
        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-server
        
        
        
            com.atguigu.springcloud
            cloud-api-commons
            ${project.version}
        
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        
        
            org.projectlombok
            lombok
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            junit
            junit
        
    
server:
  port: 7001


eureka:
  instance:
    hostname: eureka7001.com #eureka服务端的实例名称
  client:
    register-with-eureka: false     #false表示不向注册中心注册自己。
    fetch-registry: false     #false表示自己端就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
    service-url:
    #集群指向其它eureka
      #defaultZone: http://eureka7002.com:7002/eureka/
    #单机就是7001自己
      defaultZone: http://eureka7001.com:7001/eureka/
  #server:
    #关闭自我保护机制,保证不可用服务被及时踢除
    #enable-self-preservation: false
    #eviction-interval-timer-in-ms: 2000
消息提供方:

目录:

    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-client
        
        
            org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        
        
        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        
        
            org.projectlombok
            lombok
            true
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

配置:

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
      stream:
        binders: # 在此处配置要绑定的rabbitmq的服务信息;
          defaultRabbit: # 表示定义的名称,用于于binding整合
            type: rabbit # 消息组件类型
            environment: # 设置rabbitmq的相关的环境配置
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
        bindings: # 服务的整合处理
          output: # 这个名字是一个通道的名称
            destination: studyExchange # 表示要使用的Exchange名称定义
            content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
            binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

接口:

public interface IMessageProvider
{
    public String send();
}

@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{
    @Resource
    private MessageChannel output; // 消息发送管道

    @Override
    public String send()
    {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*****serial: "+serial);
        return null;
    }
}

controller:

@RestController
public class SendMessageController
{
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
        return messageProvider.send();
    }

}

消费者代码

配置和目录与消息提供方几乎一致

server:
  port: 8803

spring:
  application:
    name: cloud-stream-consumer
  cloud:
      stream:
        binders: # 在此处配置要绑定的rabbitmq的服务信息;
          defaultRabbit: # 表示定义的名称,用于于binding整合
            type: rabbit # 消息组件类型
            environment: # 设置rabbitmq的相关的环境配置
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
        bindings: # 服务的整合处理
          input: # 这个名字是一个通道的名称
            destination: studyExchange # 表示要使用的Exchange名称定义
            content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
            binder: defaultRabbit # 设置要绑定的消息服务的具体设置
            group: atguiguA

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8803.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址



controller:

@Component
@EnableBinding(Sink.class)
//@EnableBinding 对应的两个接口属性 Source 和 Sink 是 SCS 内部提供的。
// SCS 内部会基于 Source 和 Sink 构造 BindableProxyFactory,
// 且对应的 output 和 input 方法返回的 MessageChannel 是 DirectChannel。output 和 input 方法修饰的注解对应的 value 是配置文件中 binding 的 name。
//————————————————

public class ReceiveMessageListenerController
{
    @Value("${server.port}")
    private String serverPort;


    @StreamListener(Sink.INPUT)
    public void input(Message message)
    {
        System.out.println("消费者2号,----->接受到的消息: "+message.getPayload()+"t  port: "+serverPort);
    }
}

访问消息提供者的接口http://localhost:8801/sendMessage
控制台输出:

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/601925.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号