rabbitmq代码
admin
2024-02-21 21:50:24
0

spring:
  rabbitmq:
    host:  # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: zhangsan # 用户名
    password: 1234 # 密码
=============================================
public class User implements Serializable {
    private static final long serliaVersionUid = 1L;
    private String name;

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    private int age;

    public static long getSerliaVersionUid() {
        return serliaVersionUid;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
===========================================================
生产者
@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }

    @Bean
    public MessageConverter jsonMessageConverter(){

        return new Jackson2JsonMessageConverter();
    }
}

=========================================================
@SpringBootTest(classes = PublisherApplication.class)
@RunWith(SpringRunner.class)
public class PublisherTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test/*简单消息模型*/
    public void simple() {
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        //需要先申明一个queue
        rabbitTemplate.convertAndSend(queueName, message);
    }

    @Test/*工作消息模型*/
    public void worker() {
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        //需要先申明一个queue
        for (int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName, "发送的第" + i + "条消息:" + message);
        }
    }

    @Test/*简单消息模型*/
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.238.128");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("zhangsan");
        factory.setPassword("1234");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }

    @Test/*工作消息模型*/
    public void testSendWorkerMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.238.128");
        factory.setPort(5672);
        factory.setVirtualHost("/itcast");
        factory.setUsername("zhangsan");
        factory.setPassword("1234");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        for (int i = 0; i < 50; i++) {
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }


    /*发布/订阅*/
    @Test/*Fanout广播*/
    public void fanout() {
        // 消息
        String message = "hello,Fanout广播!";
        //交换机
        String fanoutName = "fanoutExchange";
        // 发送消息
        //需要先申明一个queue
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(fanoutName, "", message);
        }
    }


    @Test/*Direct定向路由*/
    public void direct() {
        // 消息
        String message = "hello,Direct定向路由!";
        //交换机
        String directName = "directExchange";
        // 发送消息
        //需要先申明一个queue
        User user = new User("张三", 22);
        rabbitTemplate.convertAndSend(directName, "red", user);
        rabbitTemplate.convertAndSend(directName, "blue", user);
        rabbitTemplate.convertAndSend(directName, "yellow", user);
    }

    @Test/*Topic话题*/
    public void topic() {
        // 消息
        String message = "hello,Topic话题!";
        //交换机
        String topicName = "topicExchange";
        // 发送消息
        //需要先申明一个queue
        rabbitTemplate.convertAndSend(topicName, "china.watch", message);
        rabbitTemplate.convertAndSend(topicName, "japan.watch", message);
        rabbitTemplate.convertAndSend(topicName, "china.niubi", message);
    }


    @Test/*Topic话题*/
    public void topic2() {
        // 消息
        String message = "hello,Topic话题!";
        //交换机
        String topicName = "topicExchange2";
        // 发送消息
        //需要先申明一个queue
        rabbitTemplate.setExchange(topicName);
        rabbitTemplate.convertAndSend(topicName, "china.watch", message);
        rabbitTemplate.convertAndSend(topicName, "japan.watch", message);
        rabbitTemplate.convertAndSend(topicName, "china.niubi", message);
    }
    
    
============================================================
消费者
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
      @Bean
    public MessageConverter jsonMessageConverter(){

        return new Jackson2JsonMessageConverter();
    }
}
==========================================================
@Component
public class FanoutConfig {
    /*定义声明交换机对垒,交换机绑定*/
    //1定义一个交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    //2.定义声明一个队列
    @Bean
    public Queue queue1() {
        return new Queue("fanoutQueue1");
    }

    @Bean
    public Queue queue2() {
        return new Queue("fanoutQueue2");
    }

    //3.绑定
    @Bean
    public Binding builder1(FanoutExchange fanoutExchange, Queue queue1) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }

    @Bean
    public Binding builder2(FanoutExchange fanoutExchange, Queue queue2) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}
============================================================
@Component
public class ConsumerListener {
    int count1 = 1;
    int count2 = 1;

    @RabbitListener(queues = "simple.queue")/*简单队列监听;工作队列1*/
    public void simplequeue(Object msg) {

        System.err.println("Q1-接收到消息:【" + msg + "】" + count1++);

    }

    @RabbitListener(queues = "simple.queue") /*简单队列监听;工作队列2*/
    public void simplequeue2(Object msg) throws InterruptedException {
        System.out.println("Q2-接收到消息:【" + msg + "】" + count2++);
        Thread.sleep(20);
    }


    //===========================================================================

    @RabbitListener(queues = "fanoutQueue1") /* fanout广播1*/
    public void fanoutQueue1(Object msg) throws InterruptedException {
        System.out.println(" fanout1-接收到消息:【" + msg + "】");
        Thread.sleep(20);
    }

    @RabbitListener(queues = "fanoutQueue2") /* fanout广播2*/
    public void fanoutQueue2(Object msg) throws InterruptedException {
        System.out.println(" fanout2-接收到消息:【" + msg + "】");
        Thread.sleep(20);
    }

    //===========================================================================

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "directExchange", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))/*Direct定向路由1*/
    public void listenDirectQueue1(Object msg) {
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "directExchange", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))/*Direct定向路由2*/
    public void listenDirectQueue2(Object msg) {
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }


    //===========================================================================


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
            key = {"china.#"}
    ))/*Topic话题*/
    public void listenTopicQueue2(Object msg) {
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
            key = {"japan.#"}
    ))/*Topic话题*/
    public void listenTopicQueue1(Object msg) {
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue3"),
            exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC),
            key = {"#.watch"}
    ))/*Topic话题*/
    public void listenTopicQueue3(Object msg) {
        System.out.println("消费者接收到topic.queue3的消息:【" + msg + "】");
    }


}

相关内容

热门资讯

瞄准未获Mythos使用权限的... 法国AI初创公司Mistral AI正与欧洲多家 银行洽谈,计划部署其对标Anthropic PBC...
一件代发怎么找云仓?按这四步走... 想做无货源电商,或者想把自己从打包发货中解放出来,“一件代发”是电商卖家无法绕不开的。问题来了,市面...
优化房地产政策促市场热度提升 4月28日召开的中共中央政治局会议指出,要努力稳定房地产市场。近期,多城市调整优化房地产调控政策,从...
“视听北京·金融作品征集”活动... 2026年5月13日,第六届中国(北京)广电媒体融合发展大会“金融新视界・视听新动能”金融与视听产业...
黄金走势图蓄势反弹 低成本布局... 来源:环球市场播报 如果你一直在关注SPDR 黄金份额 ETF(GLD),会发现金价近期处于盘整震荡...
马斯克点赞宇树载人机甲:很酷 站长之家(ChinaZ.com)5月13日 消息:宇树科技在5月12日扔出了一颗重磅炸弹。这家公司正...
「数据看盘」IM期指空头大幅加... 龙虎榜方面,红板科技获多家量化资金和游资的关注,获一家量化(摩根大通中国银城中路)买入0.62亿,遭...
原创 今... 2026年5月13日金价:大家不必继续盲目等待了!接下来,金价有可能会重演历史! 国内黄金价格继续处...
“中国最大AI包工头”冲击IP... 记者|鄢银婵 编辑|何小桃 廖丹 杜恒峰 校对|金冥羽 2026年4月29日,上海基流科技股份有限公...
白敬亭沈腾成立开门见衫公司 大象新闻记者 林林 天眼查App显示,5月12日,上海开门见衫品牌管理有限公司成立,法定代表人为上官...
原创 从... 今天来给大家聊一下中国磷化铟。2026年第一季度,全球前六大光模块厂商,中国占据四席;800G和1....
财报会释放重要信号,吴泳铭解读... 新京报贝壳财经讯(记者程子姣)5月13日,阿里巴巴集团发布2026财年第四季度与全年财报。在当晚的财...
抖音让大流量转化为大消费 “3、2、1,上链接!”不再是一句直播间的卖货口号,而是吃喝玩乐一站式服务的标语。文旅风光、特色餐饮...
千亿资本开支换来自由现金流转负... AI行业的竞争已从“模型竞赛”转入“算力消耗战”,为了应对这一趋势,阿里也正在大举进行新一轮AI基础...
原创 美... 美国诺克斯堡金库,那座号称囤积了4500吨黄金的神秘仓库,再次成为舆论焦点。而美国总统特朗普,这位以...
东方嘉富人寿:童超当选公司董事... 北京商报讯(记者 李秀梅)5月13日,东方嘉富人寿保险有限公司(以下简称“东方嘉富人寿”)公告,根据...
布朗32分孙铭徽复出 浙江广厦... 【搜狐体育战报】北京时间5月13日CBA季后赛,主场作战的浙江浙商证券以91-67击败山西汾酒,伤缺...
原创 外... 外汇储备被网友戏称为“金融核武”,因为其不仅在稳定汇率、保障经济安全上发挥关键作用,还有助于推动人民...
美股首只纯存储ETF,刷新华尔... 财联社5月13日讯(编辑 史正丞)近期存储芯片板块的猛烈上涨,使得一只成立不到6周的ETF成为华尔街...