赞
踩
- Thread consumerThread = new Thread(new Runnable() {
-
- public void run() {
- MessageAndMetadata<byte[], byte[]> messageAndMetadata = null;
- try {
- while (consumer.hasNext() && !Thread.interrupted()) {
- messageAndMetadata = consumer.getData();
- try {
- messageHandler.onMsg(new String(messageAndMetadata.message(), "UTF-8"));
- } catch (Exception ee) {
- long retries = messageHandler.getErrorMaxRetries();
- // range from 1 ~ DEFAULT_MAX_RETRIES do as it, other retry DEFAULT_MAX_RETRIES times
- if (retries != -1) {
- if (retries<=0 || retries>DEFAULT_MAX_RETRIES) {
- retries = DEFAULT_MAX_RETRIES;
- }
- for (int i=0; i<retries; i++) {
- LOG.info(String.format("[USING RETRY] %s times, for consuming topic:%s, sleeping %s seconds", (i+1), topic, RETRY_INTERVAL_ARR[i]));
- try {
- Thread.sleep(RETRY_INTERVAL_ARR[i]*1000);
- messageHandler.onMsg(new String(messageAndMetadata.message(), "UTF-8"));
- } catch (Exception e1) {
- LOG.error(e1.getMessage(), e1);
- }
- }
- } else {
- LOG.error(String.format("[NO RETRY] Processing msg:[%s] met error, please check it.", new String(messageAndMetadata.message(), "UTF-8")), ee);
- }
- }
-
- messageHandler.increment();
-
- // should commit offset or not
- if (messageHandler.shouldCommit()) {
- consumer.commitOffsets();
- messageHandler.setLastTimeCommit(System
- .currentTimeMillis());
- if (LOG.isInfoEnabled()) {
- LOG.info("Successfully commitOffsets, topic:"
- + messageAndMetadata.topic()
- +" partition: "
- +messageAndMetadata.partition()
- + " offset: "
- + messageAndMetadata.offset());
-
- }
- }
-
- }
- } catch (Exception e) {
- try {
- if (messageAndMetadata != null) {
- LOG.error(Joiner.on("").join("topic:", messageAndMetadata.topic(),
- " partition:", messageAndMetadata.partition(),
- " msgContent:", new String(messageAndMetadata.message(), "UTF-8")), e);
- }
- } catch (UnsupportedEncodingException e1) {
- LOG.error(e1.getMessage(), e1);
- }
- } finally {
- LOG.info(String.format("DafkaConsumer thread for topic:%s will give up, try me later...", topic));
- if (consumer!=null) {
- consumer.shutdown();
- }
- }
-
- }
- });
- consumerThread.setName("DafkaConsumerThread-"
- + properties.getProperty("topic"));
- consumerThread.start();
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。