rabbitmq代码
admin
2024-02-21 21:50:24
0

spring:
  rabbitmq:
    host:  # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: zhangsan # 用户名
    password: 1234 # 密码
=============================================
public class User implements Serializable {
    private static final long serliaVersionUid = 1L;
    private String name;

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    private int age;

    public static long getSerliaVersionUid() {
        return serliaVersionUid;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
===========================================================
生产者
@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }

    @Bean
    public MessageConverter jsonMessageConverter(){

        return new Jackson2JsonMessageConverter();
    }
}

=========================================================
@SpringBootTest(classes = PublisherApplication.class)
@RunWith(SpringRunner.class)
public class PublisherTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test/*简单消息模型*/
    public void simple() {
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        //需要先申明一个queue
        rabbitTemplate.convertAndSend(queueName, message);
    }

    @Test/*工作消息模型*/
    public void worker() {
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        //需要先申明一个queue
        for (int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName, "发送的第" + i + "条消息:" + message);
        }
    }

    @Test/*简单消息模型*/
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.238.128");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("zhangsan");
        factory.setPassword("1234");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }

    @Test/*工作消息模型*/
    public void testSendWorkerMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.238.128");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("zhangsan");
        factory.setPassword("1234");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        for (int i = 0; i < 50; i++) {
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }


    /*发布/订阅*/
    @Test/*Fanout广播*/
    public void fanout() {
        // 消息
        String message = "hello,Fanout广播!";
        //交换机
        String fanoutName = "fanoutExchange";
        // 发送消息
        //需要先申明一个queue
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(fanoutName, "", message);
        }
    }


    @Test/*Direct定向路由*/
    public void direct() {
        // 消息
        String message = "hello,Direct定向路由!";
        //交换机
        String directName = "directExchange";
        // 发送消息
        //需要先申明一个queue
        User user = new User("张三", 22);
        rabbitTemplate.convertAndSend(directName, "red", user);
        rabbitTemplate.convertAndSend(directName, "blue", user);
        rabbitTemplate.convertAndSend(directName, "yellow", user);
    }

    @Test/*Topic话题*/
    public void topic() {
        // 消息
        String message = "hello,Topic话题!";
        //交换机
        String topicName = "topicExchange";
        // 发送消息
        //需要先申明一个queue
        rabbitTemplate.convertAndSend(topicName, "china.watch", message);
        rabbitTemplate.convertAndSend(topicName, "japan.watch", message);
        rabbitTemplate.convertAndSend(topicName, "china.niubi", message);
    }


    @Test/*Topic话题*/
    public void topic2() {
        // 消息
        String message = "hello,Topic话题!";
        //交换机
        String topicName = "topicExchange2";
        // 发送消息
        //需要先申明一个queue
        rabbitTemplate.setExchange(topicName);
        rabbitTemplate.convertAndSend(topicName, "china.watch", message);
        rabbitTemplate.convertAndSend(topicName, "japan.watch", message);
        rabbitTemplate.convertAndSend(topicName, "china.niubi", message);
    }
    
    
============================================================
消费者
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
      @Bean
    public MessageConverter jsonMessageConverter(){

        return new Jackson2JsonMessageConverter();
    }
}
==========================================================
@Component
public class FanoutConfig {
    /*定义声明交换机对垒,交换机绑定*/
    //1定义一个交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    //2.定义声明一个队列
    @Bean
    public Queue queue1() {
        return new Queue("fanoutQueue1");
    }

    @Bean
    public Queue queue2() {
        return new Queue("fanoutQueue2");
    }

    //3.绑定
    @Bean
    public Binding builder1(FanoutExchange fanoutExchange, Queue queue1) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }

    @Bean
    public Binding builder2(FanoutExchange fanoutExchange, Queue queue2) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}
============================================================
@Component
public class ConsumerListener {
    int count1 = 1;
    int count2 = 1;

    @RabbitListener(queues = "simple.queue")/*简单队列监听;工作队列1*/
    public void simplequeue(Object msg) {

        System.err.println("Q1-接收到消息:【" + msg + "】" + count1++);

    }

    @RabbitListener(queues = "simple.queue") /*简单队列监听;工作队列2*/
    public void simplequeue2(Object msg) throws InterruptedException {
        System.out.println("Q2-接收到消息:【" + msg + "】" + count2++);
        Thread.sleep(20);
    }


    //===========================================================================

    @RabbitListener(queues = "fanoutQueue1") /* fanout广播1*/
    public void fanoutQueue1(Object msg) throws InterruptedException {
        System.out.println(" fanout1-接收到消息:【" + msg + "】");
        Thread.sleep(20);
    }

    @RabbitListener(queues = "fanoutQueue2") /* fanout广播2*/
    public void fanoutQueue2(Object msg) throws InterruptedException {
        System.out.println(" fanout2-接收到消息:【" + msg + "】");
        Thread.sleep(20);
    }

    //===========================================================================

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "directExchange", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))/*Direct定向路由1*/
    public void listenDirectQueue1(Object msg) {
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "directExchange", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))/*Direct定向路由2*/
    public void listenDirectQueue2(Object msg) {
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }


    //===========================================================================


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
            key = {"china.#"}
    ))/*Topic话题*/
    public void listenTopicQueue2(Object msg) {
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
            key = {"japan.#"}
    ))/*Topic话题*/
    public void listenTopicQueue1(Object msg) {
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue3"),
            exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
            key = {"#.watch"}
    ))/*Topic话题*/
    public void listenTopicQueue3(Object msg) {
        System.out.println("消费者接收到topic.queue3的消息:【" + msg + "】");
    }


}

相关内容

热门资讯

中山东方医院标准化就诊流程:从... 在医疗服务质量不断提升的今天,标准化就诊流程建设已成为医院提升服务效率、改善患者体验的重要抓手。医院...
彩票卖不动了?去年全国彩票收入... 中国彩票收入增速持续放缓。 1月30日,财政部公布2025年12月份全国彩票销售情况。2025年全年...
原创 超... 当消费者为家中购置新物品时,功能之外,产品在“家”中的融入感、协调性如何,正成为越来越重要的考量——...
寒武纪预计2025年至高盈利2... 《科创板日报》1月30日讯(记者 郭辉)寒武纪发布2025年年度业绩预告。 公告显示,寒武纪预计20...
2025年我国基本医保统筹基金... 2025年我国基本医保统筹基金收入约2.95万亿元 新华社北京1月30日电(记者彭韵佳)记者1月3...
美股收盘:三大指数集体收低 多... 财联社1月31日讯(编辑 赵昊)周五(1月30日),美股低开低收,三大指数集体下跌。 截至收盘,道琼...
原创 金... 金价暴跌就该抛? 1月30日今日金价,黄金暴涨5%,踏空的人又哭了。 但你可能不知道,1月29号上海...
谈谈“体育强国”融入“健康中国... 聚焦健康中国 体育(第一健康报道融媒体中心 老柴体悟) 一谈到“健康中国”,我们往往会联想到“长寿...
服用这3类药不宜“多喝水” 提到吃药,“多喝水”似乎是默认的常识,可以帮助药物吞咽、减少对食道刺激、促进药物吸收,但并非所有药物...
为何看中医最好选“早上”? 68岁的陈阿姨常年失眠、乏力,夜里总在凌晨三点惊醒,醒来口干却不想喝水,白天精神萎靡,吃了一年多的安...
沃什提名引爆金银“血洗”!盘中... 美国总统特朗普提名沃什(Kevin Warsh)出任美联储主席引爆了贵金属数十年来最惨烈的抛售。 周...
深夜突发!金价大跳水,日内跌超... 北京时间1月31日凌晨,恐慌性抛售席卷全球贵金属市场。 截至发稿,现货白银日内跌幅扩大至34.67%...
拆解字节公益:新时代的新公益范... 最近,李亚鹏的嫣然天使儿童医院欠租新闻,又一次把“传统公益”推上了风口浪尖。 大家讨论得很热烈,捐...
平衡稳进破解投资难题 农银瑞恒... 当前利率处于低位,A股围绕4100点附近震荡,在此背景下,由农银汇理基金打造的农银瑞恒债券型证券投资...
记者手记|欧元汇率重返1.20... 新华社法兰克福1月30日电 记者手记|欧元汇率重返1.20背后的美元体系失序 新华社记者马悦然 近年...
为便捷付费,还是放弃拥有?订阅... 在“订阅制”日益普及的今天,人们对内容、软件和服务的“拥有权”似乎正逐渐让位于“使用权”。从流媒体平...
联合北大国发院 小红书将发起“... 真正的生活经济,就是让商业回归“为人服务”的本质。小红书方面表示,未来将与北京大学国家发展研究院等智...
“冻品一哥”转行做面包? 出品 | 创业最前线 作者 | 付艳翠 编辑 | 冯羽 美编 | 邢静 审核 | 颂文 最近,西贝与...
机器人“烧钱”也要上春晚打拳 记者 任晓宁 1月底,央视马年春晚彩排现场,宇树科技、银河通用、魔法原子、松延动力等具身智能公司的人...
芳烃与农化率先“突围” A股化... 本报记者 王僖 2026年以来,多个化工品价格强势反弹,并带动A股化工板块“春潮”涌动。 Wind资...