`
webcode
  • 浏览: 5947454 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

AcitveMQ编程实践

 
阅读更多

本文主要介绍了ActiveMQ的编程模型,常用的类。以及一个通用的MQ编程模式。方便初学者快速掌握 ActiveMQ的编程方法。

一、 开发 JMS的步骤

一个 JMS 应用是几个 JMS 客户端交换消息,开发 JMS 客户端应用由以下几步构成:

1、用 JNDI 得到 ConnectionFactory 对象;

2、用 ConnectionFactory 创建 Connection 对象;

3、用 Connection 对象创建一个或多个 JMS Session

4、用 JNDI 得到目标队列或主题对象,即 Destination 对象;

5、用 Session Destination 创建 MessageProducer MessageConsumer

6、通知 Connection 开始传送消息。

二、 编程模型

1、 ConnectionFactory

要初始化 JMS,则需要使用连接工厂。客户端通过创建 ConnectionFactory建立到 ActveMQ的连接,一个连接工厂封装了一组连接配置参数,这组参数在配置ActiveMQ时已经定义,例如brokerURL参数,此参数传入的是ActiveMQ服务地址和端口,支持openwire协议的默认连接为 tcp://localhost:61616,支持 stomp协议的默认连接为 tcp://localhost:61613

ActiveMQConnectionFactory构造方法:

ActiveMQConnectionFactory();

ActiveMQConnectionFactory(String brokerURL);

ActiveMQConnectionFactory(String userName, String password, String b rokerURL) ;

ActiveMQConnectionFactory(String userName, String password, URI brok erURL) ;

ActiveMQConnectionFactory(URI brokerURL);

其中 brokerURLActiveMQ服务地址和端口。

例如:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192. 168.0.135:61616");

或者

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

connectionFactory. setBrokerURL("tcp://192.168.0.135:61616");

2Connection

在成功创建正确的ConnectionFactory后,下一步将是创建一个连接,它是JMS定义的一个接口。ConnectionFactory负责返回可以与底层消息传递系统进行通信的Connection实现。通常客户端只使用单一连接。根据JMS文档,Connection的目的是“利用 JMS提供者封装开放的连接”,以及表示“客户端与提供者服务例程之间的开放TCP /IP套接字”。该文档还指出 Connection应该是进行客户端身份验证的地方,除了其他一些事项外,客户端还可以指定惟一标志符。

当一个Connection被创建时,它的传输默认是关闭的,必须使用start方法开启。一个Connection可以建立一个或多个的Session。当一个程序执行完成后,必须关闭之前创建的Connection,否则 ActiveMQ不能释放资源,关闭一个Connection同样也关闭了 SessionMessageProducerMessageConsumer

Connection支持并发。

2.1、创建Connection

ActiveMQConnectionFactory方法:

Connection createConnection()

Connection createConnection(String userName, String password);

2.2、开启 Connection

void start();

如:connection.start();

2.3关闭 Connection

void close();

如:connection.close();

3、 Session

一旦从ConnectionFactory中获得一个Connection,就必须从Connection中创建一个或者多个SessionSession是一个发送或接收消息的线程,可以使用Session创建 MessageProducerMessageConsumerMessage

Session可以被事务化,也可以不被事务化。通常可以通过向Connection上的适当创建方法传递一个布尔参数对此进行设置。

Session createSession(boolean transacted, int acknowledgeMode);

其中transacted为使用事务标识,acknowledgeMode为签收模式。

如:Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

4、 Destination

Destination是一个客户端用来指定生产消息目标和消费消息来源的对象。

PTP模式中,Destination被称作Queue即队列;在Pub/Sub模式,Destination被称作Topic即主题。在程序中可以使用多个QueueTopic

Queue createQueue(String queueName);

TemporaryQueue createTemporaryQueue();

Topic createTopic(String topicName);

TemporaryTopic createTemporaryTopic();

如: Destination destination = session.createQueue("TEST.FOO");

5、 MessageProducer

MessageProducer是一个由Session创建的对象,用来向Destination发送消息。

5.1、创建MessageProducer

MessageProducer createProducer(Destination destination);

如:MessageProducer producer = session.createProducer(destination);

5.2 发送消息

void send(Destination destination, Message message);

void send(Destination destination, Message message, int deliveryMode, in tpriority, long timeToLive);

void send(Message message);

void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode为传送模式,priority为消息优先级,timeToLive为消息过期时间。

如:producer.send(message);

6、 MessageConsumer

MessageConsumer是一个由Session创建的对象,用来从Destination接收消息。

6.1、创建MessageConsumer

MessageConsumer createConsumer(Destination destination);

MessageConsumer createConsumer(Destination destination, String messageSelector);

MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);

TopicSubscriber createDurableSubscriber(Topic topic, String name);

TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);

其中messageSelector为消息选择器;noLocal标志默认为false,当设置为true时限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志只适用于主题,不适用于队列;name标识订阅主题所对应的订阅名称,持久订阅时需要设置此参数。

如:MessageConsumer consumer = session.createConsumer(destination);

6.2、消息的同步与异步接收

消息的同步接收是指客户端主动去接收消息,客户端可以采用MessageConsumerreceive方法去接收下一个消息。

消息的异步接收是指当消息到达时,ActiveMQ主动通知客户端。客户端可以通过注册一个实现 MessageListener接口的对象到MessageConsumerMessageListener只有一个必须实现的方法onMessage,它只接收一个参数,即Message。在为每个发送到Destination的消息实现onMessage时,将调用该方法。

Message receive()

Message receive(long timeout)

Message receiveNoWait()

其中timeout为等待时间,单位为毫秒。

或者实现MessageListener接口,每当消息到达时,ActiveMQ会调用MessageListener中的

onMessage函数。

如:Message message = consumer.receive();

6.3、消息选择器

JMS提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。

消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。按照JMS文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。可以将消息选择器作为 MessageConsumer创建的一部分。

如:public final String SELECTOR = “JMSType = ‘TOPIC_PUBLISHER’”;

该选择器检查了传入消息的JMSType属性,并确定了这个属性的值是否等于 TOPIC _PUBLISHER。如果相等,则消息被消费;如果不相等,那么消息会被忽略。

分享到:
评论

相关推荐

    AcitveMQ入门案例代码        

    JMS之ActiveMQ入门案例java工程代码,包括点对点消息模式,发布者--订阅者消息模式。

    AcitveMQ消息队列.pdf

    AcitveMQ消息队列

    JMS消息服务AcitveMQ的配置和测试小例子

    NULL 博文链接:https://flyer2010.iteye.com/blog/662047

    android 开发,acitvemq

    android开发,订阅activemq,然后向activemq发送消息,也可收消息,完整代码,可运行

    java实现的消息中间件之AcitveMQ详解,学习学习

    KahaDB存储 KahaDB他是默认的持久化策略,所有消息都会顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对...

    ActiveMQ使用Demo

    ActiveMQ使用的一个小demo,助你快速入门。

    Activemq原理文档

    本文档总结了Activemq的所有特性,包括整体架构、消息传输原理、部署、消息存储和通讯机制,文字结合图示,读了这个文档,会对Acitvemq有个非常透彻的了解

    activeMQ消息中间件

    AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的。

    activemq小心得.docx

    ActiveMQ小心的,帮助你了解 acitvemq基本设置等,很好的。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

    工具使用篇——python操作ActiveMq

    2、本机需要安装acitvemq,安装过程参考[https://blog.csdn.net/weixin_41806489/article/details/104997519] (https://www.csdn.net/). 注意: 一般用python是通过stomp协议进行通讯 操作过程 1、生产方 prublishi....

    ActiveMQ消息传送机制以及ACK机制详解

    AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的。Producer客户端使用来发送消息的,Consumer客户端...

    ActiveMQ订阅模式持久化实现

    一个订阅通道,支持多个客户端监听,当某个客户端掉线后,再上线的时候可以收到它没有接收到的消息。

Global site tag (gtag.js) - Google Analytics