篮球世界杯赌球浅谈Springboot整合RocketMQ使用心得

2019-10-07 02:21 来源:未知

浅谈Springboot整合RocketMQ使用心得,springbootrocketmq

一、阿里云官网---帮助文档

https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh

按照官网步骤,创建Topic、申请发布(生产者)、申请订阅(消费者)

二、代码

1、配置:

public class MqConfig {
  /**
   * 启动测试之前请替换如下 XXX 为您的配置
   */
  public static final String PUBLIC_TOPIC = "test";//公网测试
  public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER";
  public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE";

  public static final String ACCESS_KEY = "123";
  public static final String SECRET_KEY = "123";
  public static final String TAG = "";
  public static final String THREAD_NUM = "25";//消费端线程数
  /**
   * ONSADDR 请根据不同Region进行配置
   * 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
   * 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
   * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
   * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
   */
  public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";
}

ONSADDR 阿里云用 公有云生产,测试用公网

不同的业务可以设置不同的tag,但是如果发送消息量大的话,建议新建TOPIC

2、生产者

方式1:

配置文件:producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
  <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean"
     init-method="start" destroy-method="shutdown">
    <property name="properties">
      <map>
        <entry key="ProducerId" value="" /> <!-- PID,请替换 -->
        <entry key="AccessKey" value="" /> <!-- ACCESS_KEY,请替换 -->
        <entry key="SecretKey" value="" /> <!-- SECRET_KEY,请替换 -->
        <!--PropertyKeyConst.ONSAddr 请根据不同Region进行配置
         公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
         公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal -->
        <entry key="ONSAddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/>
      </map>
    </property>
  </bean>
</beans>

启动方式1,在使用类的全局里设置:

//初始化生产者
  private ApplicationContext ctx;
  private ProducerBean producer;

  @Value("${producerConfig.enabled}")//开关,spring配置项,true为开启,false关闭
  private boolean producerConfigEnabled;

  @PostConstruct
  public void init(){
    if (true == producerConfigEnabled) {
      ctx = new ClassPathXmlApplicationContext("producer.xml");
      producer = (ProducerBean) ctx.getBean("producer");
    }
  }

PS:最近发现一个坑,如果producer用上面这种方式启动的话,一旦启动的多了,会造成fullGC,所以可以换成下面这种注解方式启动,在用到的地方手动start、shutdown

方式2:配置类(不需要xml)

@Configuration
public class ProducerBeanConfig {

  @Value("${openservices.ons.producerBean.producerId}")
  private String producerId;

  @Value("${openservices.ons.producerBean.accessKey}")
  private String accessKey;

  @Value("${openservices.ons.producerBean.secretKey}")
  private String secretKey;

  private ProducerBean producerBean;

  @Value("${openservices.ons.producerBean.ONSAddr}")
  private String ONSAddr;

  @Bean
  public ProducerBean oneProducer() {
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.ProducerId, producerId);
    properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);

    producerBean.setProperties(properties);
    return producerBean;
  }
}

PS:经过这次双11发现,以上2种方式在大数据量,多线程情况下都不太适用, 性能很差,所以推荐用3

方式3:(不需要xml)

@Component
public class ProducerBeanSingleTon {

  @Value("${openservices.ons.producerBean.producerId}")
  private String producerId;

  @Value("${openservices.ons.producerBean.accessKey}")
  private String accessKey;

  @Value("${openservices.ons.producerBean.secretKey}")
  private String secretKey;

  @Value("${openservices.ons.producerBean.ONSAddr}")
  private String ONSAddr;

  private static Producer producer;

  private static class SingletonHolder {
    private static final ProducerBeanSingleTon INSTANCE = new ProducerBeanSingleTon();
  }

  private ProducerBeanSingleTon (){}

  public static final ProducerBeanSingleTon getInstance() {
    return SingletonHolder.INSTANCE;
  }

  @PostConstruct
  public void init(){
    // producer 实例配置初始化
    Properties properties = new Properties();
    //您在控制台创建的Producer ID
    properties.setProperty(PropertyKeyConst.ProducerId, producerId);
    // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    //设置发送超时时间,单位毫秒
    properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
    // 设置 TCP 接入域名(此处以公共云生产环境为例)
    properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
    producer = ONSFactory.createProducer(properties);
    // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可
    producer.start();
  }

  public Producer getProducer(){
    return producer;
  }
}

spring配置

spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

consumerConfig.enabled = true

producerConfig.enabled = true #方式1:

scheduling.enabled = false

#方式2、3:rocketMQ u516Cu7F51u914Du7F6E
openservices.ons.producerBean.producerId = pid
openservices.ons.producerBean.accessKey = 
openservices.ons.producerBean.secretKey = 

openservices.ons.producerBean.ONSAddr = 公网、杭州公有云生产

方式1投递消息代码:

 try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info(".Send mq message success!”;

   } else {
     logger.warn(".sendResult is null.........");
   }
   } catch (Exception e) {
      logger.warn("DoubleElevenAllPreService");
      Thread.sleep(1000);//如果有异常,休眠1秒
   }

方式2投递消息代码:(可以每发1000个启动/关闭一次)

   producerBean.start();
try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info(".Send mq message success!”;

   } else {
     logger.warn(".sendResult is null.........");
   }
   } catch (Exception e) {
      logger.warn("DoubleElevenAllPreService");
      Thread.sleep(1000);//如果有异常,休眠1秒
   }

   producerBean.shutdown();

方式3:投递消息

 try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   Producer producer = ProducerBeanSingleTon.getInstance().getProducer();
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info("DoubleElevenMidService.Send mq message success! Topic is:"”;

   } else {
     logger.warn("DoubleElevenMidService.sendResult is null.........");
   }
   } catch (Exception e) {
     logger.error("DoubleElevenMidService Thread.sleep 1 s___error is " e.getMessage(), e);
     Thread.sleep(1000);//如果有异常,休眠1秒
   }

发送消息的代码一定要捕获异常,不然会重复发送。

这里的TOPIC用自己创建的,elevenMessage是要发送的内容,我这里是自己建的对象

3、消费者

配置启动类:

@Configuration
@ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true)
public class ConsumerConfig {

  private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name());

  @Bean
  public Consumer consumerFactory(){//不同消费者 这里不能重名
    Properties consumerProperties = new Properties();
    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID);
    consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
    consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
    //consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM);
    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR);
    Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener());//new对应的监听器
    consumer.start();
    logger.info("ConsumerConfig start success.");


    return consumer;

  }
}

CID和ONSADDR一点要选对,用自己的,消费者线程数等可以在这里配置

创建消息监听器类,消费消息:

@Component
public class MessageListener implements MessageListener {
  private Logger logger = LoggerFactory.getLogger("remind");

  protected static ElevenReposity elevenReposity;
  @Resource
  public void setElevenReposity(ElevenReposity elevenReposity){
    MessageListener .elevenReposity=elevenReposity;
  }


  @Override
  public Action consume(Message message, ConsumeContext consumeContext) {

    if(message.getTopic().equals("自己的TOPIC")){//避免消费到其他消息 json转换报错
      try {

      byte[] body = message.getBody();
      String res = new String(body);

      //res 是生产者传过来的消息内容

        //业务代码

      }else{
        logger.warn("!");
      }

      } catch (Exception e) {
        logger.error("MessageListener.consume error:"   e.getMessage(), e);
      }

      logger.info("MessageListener.Receive message”);
      //如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
      return Action.CommitMessage;
    }else{
      logger.warn();
      return Action.ReconsumeLater;
    }

  }

注意,由于消费者是多线程的,所以对象要用static set注入,把对象的级别提升到进程,这样多个线程就可以共用,但是无法调用父类的方法和变量

篮球世界杯赌球 1

消费者状态可以查看消费者是否连接成功,消费是否延迟,消费速度等

重置消费位点可以清空所有消息

三、注意事项

1、发送的消息体 最大为256KB

2、消息最多存在3天

3、消费端默认线程数是20

4、如果运行过程中出现java挂掉或者cpu占用异常高,可以在发送消息的时候,每发送1000条让线程休息1s

5、本地测试或启动的时候,把ONSADDR换成公网,不然报错无法启动

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持帮客之家。

http://www.bkjia.com/Javabc/1296232.htmlwww.bkjia.comtruehttp://www.bkjia.com/Javabc/1296232.htmlTechArticle浅谈Springboot整合RocketMQ使用心得,springbootrocketmq 一、阿里云官网---帮助文档 https://help.aliyun.com/document_detail/29536.htmlspm=5176.doc29535.6.555.WWTIU...

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

下载地址1:

客户端

#-- encoding:utf-8 -*-*

__author__ = 'shouke'

from kafka.client import KafkaClient

client = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)

# 获取所有broker

brokers = client.cluster.brokers()

for broker in brokers:

print('broker: ', broker)

print('broker nodeId: ', broker.nodeId)

# 获取主题的所有分区

topic = 'MY_TOPIC1'

partitions = client.cluster.available_partitions_for_topic

print(partitions)

partition_dict = {}

partition_dict[topic] = [partition for partition in partitions]

print(partition_dict)

运行结果:

broker: BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)

broker nodeId: 0

{0}

{'MY_TOPIC1': [0]}

API及常用参数说明:

class kafka.client.KafkaClient(**configs)

bootstrap_servers–'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)

client_id – 客户端名称,默认值: ‘kafka-python-{version}’

request_timeout_ms – 客户端请求超时时间,单位毫秒。默认值: 30000.

参考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

brokers()

获取所有broker元数据

available_partitions_for_topic

返回主题的所有分区

参考API: https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html

zookeeper-3.4.13.tar.gz

kafka_python-1.4.4-py2.py3-none-any.whl

消费者

#-- encoding:utf-8 -*-*

__author__ = 'shouke'

from kafka import KafkaConsumer

from kafka import TopicPartition

import json

consumer = KafkaConsumer('MY_TOPIC1',
bootstrap_servers=['127.0.0.1:9092'],
#auto_offset_reset='', auto_offset_reset='latest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费 enable_auto_commit=True, # 自动提交消费者的offset auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔 group_id='MY_GROUP1',
consumer_timeout_ms= 10000, # 如果10秒内kafka中没有可供消费的数据,自动退出 client_id='consumer-python3' )

for msg in consumer:

print

print('topic: ', msg.topic)

print('partition: ', msg.partition)

print('key: ', msg.key, 'value: ', msg.value)

print('offset:', msg.offset)

print('headers:', msg.headers)

# Get consumer metrics

metrics = consumer.metrics()

print

运行效果

通过assign、subscribe两者之一为消费者设置消费的主题

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],

auto_offset_reset='latest',

enable_auto_commit=True, #篮球世界杯赌球, 自动提交消费数据的offset

consumer_timeout_ms= 10000, # 如果1秒内kafka中没有可供消费的数据,自动退出

value_deserializer=lambda m: json.loads(m.decode('ascii')), #消费json 格式的消息

client_id='consumer-python3'

)

# consumer.assign([TopicPartition('MY_TOPIC1', 0)])

# msg = next

# print

consumer.subscribe('MY_TOPIC1')

for msg in consumer:

print

API及常用参数说明:

class kafka.KafkaConsumer(*topics, **configs)

*topics – 可选,设置需要订阅的topic,如果未设置,需要在消费记录前调用subscribe或者assign。

client_id – 客户端名称,默认值: ‘kafka-python-{version}’

group_id(str or None) – 消费组名称。如果为None,则通过group coordinator auto-partition分区分配,offset提交被禁用。默认为None

auto_offset_reset – 重置offset策略: 'earliest'将移动到最老的可用消息, 'latest'将移动到最近消息。 设置为其它任何值将抛出异常。默认值:'latest'。

enable_auto_commit – 如果为True,将自动定时提交消费者offset。默认为True。

auto_commit_interval_ms – 自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。

value_deserializer - 携带原始消息value并返回反序列化后的value

subscribe, pattern=None, listener=None)

订阅需要的主题

topics – 需要订阅的主题列表

pattern – 用于匹配可用主题的模式,即正则表达式。注意:必须提供topics、pattern两者参数之一,但不能同时提供两者。

metrics(raw=False)

获取消费者性能指标。

参考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

生产者

#-- encoding:utf-8 -*-*

__author__ = 'shouke'

from kafka import KafkaProducer

import json

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

for i in range:

producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)

# Block直到单条消息发送完或者超时

future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')

result = future.get(timeout=60)

print

# Block直到所有阻塞的消息发送到网络

# 注意: 该操作不保证传输或者消息发送成功,仅在配置了linger_ms的情况下有用。(It is really only useful if you configure internal batching using linger_ms

# 序列化json数据

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps.encode('utf-8'))

producer.send('MY_TOPIC1', {'shouke':'kafka'})

# 序列化字符串key

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)

producer.send('MY_TOPIC1', b'shouke', key='strKey')

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')

for i in range:

producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))

# 消息记录携带header

producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])

# 获取性能数据(注意,实践发现分区较多的情况下,该操作比较耗时

metrics = producer.metrics()

print

producer.flush()

实践中遇到错误: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解决方案如下:

进入到配置目录,编辑server.properties文件,

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

https://pypi.org/project/kafka-python/#description

下载地址2:

下载地址2:

下载地址2:

下载地址2:

1.测试环境

下载地址1:

下载地址1:

下载地址:https://pan.baidu.com/s/1VpYk8JvMuztzbvEF8mQoRw

python 3.4

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

说明:

构建生产者对象时,可通过compression_type 参数指定由对应生产者生产的消息数据的压缩方式,或者在producer.properties配置中配置compression.type参数。

TAG标签:
版权声明:本文由美洲杯赌球发布于计算机教程,转载请注明出处:篮球世界杯赌球浅谈Springboot整合RocketMQ使用心得