当前位置:   article > 正文

kafka-消费者组-点对点测试

kafka-消费者组-点对点测试

1、点对点测试

1.1、获取 kafka-consumer-groups.sh 的帮助信息

[root@localhost ~]# kafka-consumer-groups.sh --help
Missing required argument "[bootstrap-server]"
Option                                  Description                            
------                                  -----------                            
--all-groups                            Apply to all consumer groups.          
--all-topics                            Consider all topics assigned to a      
                                          group in the `reset-offsets` process.
--bootstrap-server <String: server to   REQUIRED: The server(s) to connect to. 
  connect to>                                                                  
--by-duration <String: duration>        Reset offsets to offset by duration    
                                          from current timestamp. Format:      
                                          'PnDTnHnMnS'                         
--command-config <String: command       Property file containing configs to be 
  config property file>                   passed to Admin Client and Consumer. 
--delete                                Pass in groups to delete topic         
                                          partition offsets and ownership      
                                          information over the entire consumer 
                                          group. For instance --group g1 --    
                                          group g2                             
--delete-offsets                        Delete offsets of consumer group.      
                                          Supports one consumer group at the   
                                          time, and multiple topics.           
--describe                              Describe consumer group and list       
                                          offset lag (number of messages not   
                                          yet processed) related to given      
                                          group.                               
--dry-run                               Only show results without executing    
                                          changes on Consumer Groups.          
                                          Supported operations: reset-offsets. 
--execute                               Execute operation. Supported           
                                          operations: reset-offsets.           
--export                                Export operation execution to a CSV    
                                          file. Supported operations: reset-   
                                          offsets.                             
--from-file <String: path to CSV file>  Reset offsets to values defined in CSV 
                                          file.                                
--group <String: consumer group>        The consumer group we wish to act on.  
--help                                  Print usage information.               
--list                                  List all consumer groups.              
--members                               Describe members of the group. This    
                                          option may be used with '--describe' 
                                          and '--bootstrap-server' options     
                                          only.                                
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          members                              
--offsets                               Describe the group and list all topic  
                                          partitions in the group along with   
                                          their offset lag. This is the        
                                          default sub-action of and may be     
                                          used with '--describe' and '--       
                                          bootstrap-server' options only.      
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          offsets                              
--reset-offsets                         Reset offsets of consumer group.       
                                          Supports one consumer group at the   
                                          time, and instances should be        
                                          inactive                             
                                        Has 2 execution options: --dry-run     
                                          (the default) to plan which offsets  
                                          to reset, and --execute to update    
                                          the offsets. Additionally, the --    
                                          export option is used to export the  
                                          results to a CSV format.             
                                        You must choose one of the following   
                                          reset specifications: --to-datetime, 
                                          --by-period, --to-earliest, --to-    
                                          latest, --shift-by, --from-file, --  
                                          to-current.                          
                                        To define the scope use --all-topics   
                                          or --topic. One scope must be        
                                          specified unless you use '--from-    
                                          file'.                               
--shift-by <Long: number-of-offsets>    Reset offsets shifting current offset  
                                          by 'n', where 'n' can be positive or 
                                          negative.                            
--state [String]                        When specified with '--describe',      
                                          includes the state of the group.     
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          state                                
                                        When specified with '--list', it       
                                          displays the state of all groups. It 
                                          can also be used to list groups with 
                                          specific states.                     
                                        Example: --bootstrap-server localhost: 
                                          9092 --list --state stable,empty     
                                        This option may be used with '--       
                                          describe', '--list' and '--bootstrap-
                                          server' options only.                
--timeout <Long: timeout (ms)>          The timeout that can be set for some   
                                          use cases. For example, it can be    
                                          used when describing the group to    
                                          specify the maximum amount of time   
                                          in milliseconds to wait before the   
                                          group stabilizes (when the group is  
                                          just created, or is going through    
                                          some changes). (default: 5000)       
--to-current                            Reset offsets to current offset.       
--to-datetime <String: datetime>        Reset offsets to offset from datetime. 
                                          Format: 'YYYY-MM-DDTHH:mm:SS.sss'    
--to-earliest                           Reset offsets to earliest offset.      
--to-latest                             Reset offsets to latest offset.        
--to-offset <Long: offset>              Reset offsets to a specific offset.    
--topic <String: topic>                 The topic whose consumer group         
                                          information should be deleted or     
                                          topic whose should be included in    
                                          the reset offset process. In `reset- 
                                          offsets` case, partitions can be     
                                          specified using this format: `topic1:
                                          0,1,2`, where 0,1,2 are the          
                                          partition to be included in the      
                                          process. Reset-offsets also supports 
                                          multiple topic inputs.               
--verbose                               Provide additional information, if     
                                          any, when describing the group. This 
                                          option may be used with '--          
                                          offsets'/'--members'/'--state' and   
                                          '--bootstrap-server' options only.   
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          members --verbose                    
--version                               Display Kafka version.  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124

1.2、列出所有的 消费者组

console-consumer-92430 是没有指定组的消费者,默认自己是一组如果没有指定组的消费者长时间关闭,会自动删除组
消费者组:每个消费者不指定组默认自己为一组,指定group相同的消费者他们为一组
一个消费者组同一个消息只能获取消费一次(组内的多个消费者只有一个会消费该消息)
同一个消费者组内的多个消费者 消费一个主题的消息时就是点对点,每个消息消费一次

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.74.148:9092 --list
console-consumer-92430
  • 1
  • 2

1.3、创建消费者1并指定组 my_group1

[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic my_topic1 --group my_group1

  • 1
  • 2
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.74.148:9092 --group my_group1 --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       0          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       2          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1.4、创建消费者2并指定组 my_group1

[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic my_topic1 --group my_group1

  • 1
  • 2
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.74.148:9092 --group my_group1 --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       0          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       2          0               0               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 /192.168.74.148 consumer-my_group1-1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1.5、创建消费者3并指定组 my_group1

[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic my_topic1 --group my_group1

  • 1
  • 2
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.74.148:9092 --group my_group1 --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       2          0               0               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       0          0               0               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 /192.168.74.148 consumer-my_group1-1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1.6、创建生产者发送消息到 my_topic1 主题

1.6.1、发送第一条消息romantic

发送单词romantic

[root@localhost ~]# kafka-console-producer.sh --bootstrap-server 192.168.74.148:9092 --topic my_topic1
>romantic
>
  • 1
  • 2
  • 3

在这里插入图片描述

发送一个消息romantic后发现消费者3中的 current-offset 和 log-end-offset 都变成了1

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.74.148:9092 --group my_group1 --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       2          0               0               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       0          1               1               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 /192.168.74.148 consumer-my_group1-1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1.6.2、发送第二条消息access

在这里插入图片描述

发送一个消息access后发现消费者2中的 current-offset 和 log-end-offset 都变成了1

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.74.148:9092 --group my_group1 --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       2          1               1               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       0          1               1               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 /192.168.74.148 consumer-my_group1-1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1.6.3、发送第三条消息ability

在这里插入图片描述

发送一个消息ability后发现消费者3中的 current-offset 和 log-end-offset 都变成了2,因为第一次消息也是消费者3接收

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.74.148:9092 --group my_group1 --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       2          1               1               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       0          2               2               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 /192.168.74.148 consumer-my_group1-1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1.6.4、发送第四条消息accompany

在这里插入图片描述

发送一个消息accompany后发现消费者2中的 current-offset 和 log-end-offset 都变成了2,因为第二次消息也是消费者2接收

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.74.148:9092 --group my_group1 --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       2          2               2               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       0          2               2               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 /192.168.74.148 consumer-my_group1-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1.6.5、发送第五条消息abandon

在这里插入图片描述

发送一个消息abandon后发现消费者1中的 current-offset 和 log-end-offset 都变成了1,因为消费者1是第一次接收到消息

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.74.148:9092 --group my_group1 --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          1               1               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       2          2               2               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 /192.168.74.148 consumer-my_group1-1
my_group1       my_topic1       0          2               2               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 /192.168.74.148 consumer-my_group1-1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/642104
推荐阅读
相关标签
  

闽ICP备14008679号