您好,欢迎来到汇智旅游网。
搜索
您的当前位置:首页kafka topicPartitions问题

kafka topicPartitions问题

来源:汇智旅游网

当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息

现象如下:

2019-06-11 15:30:02.516 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238202513====sendTest3===3
2019-06-11 15:30:02.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238202513====sendTest3===3
2019-06-11 15:30:02.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238202513====sendTest3===3
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerA]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerC]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  com.example.kafka.consumer.TestB - ==[cousumerB]==1560238204513====sendTest3===4
@KafkaListener(groupId = "test2",topicPartitions = {@TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerA(String msg) {
    logger.info("==[cousumerA]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = {@TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerB(String msg) {
    logger.info("==[cousumerB]==" + msg);
}

@KafkaListener(groupId = "test2",topicPartitions = {@TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerC(String msg) {
    logger.info("==[cousumerC]==" + msg);
}

生产者


    @Autowired
    KafkaTemplate kafkaTemplate;

    int i =0;
    @Scheduled(fixedRate = 2000)
    public void sendTest3() {
        kafkaTemplate.send("hzl.test.aaa", System.currentTimeMillis() + "====sendTest3===" + i++);
    }

如果采用如下方式,则只会被消费一次

    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerD(String msg) {
        logger.info("==[cousumerD]==" + msg);
    }



    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerE(String msg) {
        logger.info("==[cousumerE]==" + msg);
    }


    @KafkaListener(topics =  "hzl.test.aaa",groupId = "test2",)
    public void cousumerF(String msg) {
        logger.info("==[cousumerF]==" + msg);
    }

另外,如何监听topic中最新的消息

auto-offset-reset: latest 

这样设置好像也是从消费组中提交后的offset开始消费的,并不是最新的一条消息?

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- hzar.cn 版权所有 赣ICP备2024042791号-5

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务