本文主要介绍了ActiveMQ的编程模型,常用的类。以及一个通用的MQ编程模式。方便初学者快速掌握 ActiveMQ的编程方法。
一、 开发 JMS的步骤
一个 JMS 应用是几个 JMS 客户端交换消息,开发 JMS 客户端应用由以下几步构成:
1、用 JNDI 得到 ConnectionFactory 对象;
2、用 ConnectionFactory 创建 Connection 对象;
3、用 Connection 对象创建一个或多个 JMS Session;
4、用 JNDI 得到目标队列或主题对象,即 Destination 对象;
5、用 Session 和 Destination 创建 MessageProducer 和 MessageConsumer;
6、通知 Connection 开始传送消息。
二、 编程模型
1、 ConnectionFactory
要初始化 JMS,则需要使用连接工厂。客户端通过创建 ConnectionFactory建立到 ActveMQ的连接,一个连接工厂封装了一组连接配置参数,这组参数在配置ActiveMQ时已经定义,例如brokerURL参数,此参数传入的是ActiveMQ服务地址和端口,支持openwire协议的默认连接为 tcp://localhost:61616,支持 stomp协议的默认连接为 tcp://localhost:61613。
ActiveMQConnectionFactory构造方法:
ActiveMQConnectionFactory();
ActiveMQConnectionFactory(String brokerURL);
ActiveMQConnectionFactory(String userName, String password, String b rokerURL) ;
ActiveMQConnectionFactory(String userName, String password, URI brok erURL) ;
ActiveMQConnectionFactory(URI brokerURL);
其中 brokerURL为ActiveMQ服务地址和端口。
例如:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192. 168.0.135:61616");
或者
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory. setBrokerURL("tcp://192.168.0.135:61616");
|
2、Connection
在成功创建正确的ConnectionFactory后,下一步将是创建一个连接,它是JMS定义的一个接口。ConnectionFactory负责返回可以与底层消息传递系统进行通信的Connection实现。通常客户端只使用单一连接。根据JMS文档,Connection的目的是“利用 JMS提供者封装开放的连接”,以及表示“客户端与提供者服务例程之间的开放TCP /IP套接字”。该文档还指出 Connection应该是进行客户端身份验证的地方,除了其他一些事项外,客户端还可以指定惟一标志符。
当一个Connection被创建时,它的传输默认是关闭的,必须使用start方法开启。一个Connection可以建立一个或多个的Session。当一个程序执行完成后,必须关闭之前创建的Connection,否则 ActiveMQ不能释放资源,关闭一个Connection同样也关闭了 Session,MessageProducer和MessageConsumer。
Connection支持并发。
2.1、创建Connection
ActiveMQConnectionFactory方法:
Connection createConnection();
Connection createConnection(String userName, String password);
|
2.2、开启 Connection
void start();
如:connection.start();
|
2.3关闭 Connection
void close();
如:connection.close();
|
3、 Session
一旦从ConnectionFactory中获得一个Connection,就必须从Connection中创建一个或者多个Session。Session是一个发送或接收消息的线程,可以使用Session创建 MessageProducer,MessageConsumer和Message。
Session可以被事务化,也可以不被事务化。通常可以通过向Connection上的适当创建方法传递一个布尔参数对此进行设置。
Session createSession(boolean transacted, int acknowledgeMode);
其中transacted为使用事务标识,acknowledgeMode为签收模式。
如:Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
4、 Destination
Destination是一个客户端用来指定生产消息目标和消费消息来源的对象。
在PTP模式中,Destination被称作Queue即队列;在Pub/Sub模式,Destination被称作Topic即主题。在程序中可以使用多个Queue和Topic。
Queue createQueue(String queueName);
TemporaryQueue createTemporaryQueue();
Topic createTopic(String topicName);
TemporaryTopic createTemporaryTopic();
如: Destination destination = session.createQueue("TEST.FOO");
|
5、 MessageProducer
MessageProducer是一个由Session创建的对象,用来向Destination发送消息。
5.1、创建MessageProducer
MessageProducer createProducer(Destination destination);
如:MessageProducer producer = session.createProducer(destination);
|
5.2 发送消息
void send(Destination destination, Message message);
void send(Destination destination, Message message, int deliveryMode, in tpriority, long timeToLive);
void send(Message message);
void send(Message message, int deliveryMode, int priority, long timeToLive);
其中deliveryMode为传送模式,priority为消息优先级,timeToLive为消息过期时间。
如:producer.send(message);
|
6、 MessageConsumer
MessageConsumer是一个由Session创建的对象,用来从Destination接收消息。
6.1、创建MessageConsumer
MessageConsumer createConsumer(Destination destination);
MessageConsumer createConsumer(Destination destination, String messageSelector);
MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);
TopicSubscriber createDurableSubscriber(Topic topic, String name);
TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);
其中messageSelector为消息选择器;noLocal标志默认为false,当设置为true时限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志只适用于主题,不适用于队列;name标识订阅主题所对应的订阅名称,持久订阅时需要设置此参数。
如:MessageConsumer consumer = session.createConsumer(destination);
|
6.2、消息的同步与异步接收
消息的同步接收是指客户端主动去接收消息,客户端可以采用MessageConsumer的receive方法去接收下一个消息。
消息的异步接收是指当消息到达时,ActiveMQ主动通知客户端。客户端可以通过注册一个实现 MessageListener接口的对象到MessageConsumer。MessageListener只有一个必须实现的方法onMessage,它只接收一个参数,即Message。在为每个发送到Destination的消息实现onMessage时,将调用该方法。
Message receive()
Message receive(long timeout)
Message receiveNoWait()
其中timeout为等待时间,单位为毫秒。
或者实现MessageListener接口,每当消息到达时,ActiveMQ会调用MessageListener中的
onMessage函数。
如:Message message = consumer.receive();
|
6.3、消息选择器
JMS提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。
消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。按照JMS文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。可以将消息选择器作为 MessageConsumer创建的一部分。
如:public final String SELECTOR = “JMSType = ‘TOPIC_PUBLISHER’”;
该选择器检查了传入消息的JMSType属性,并确定了这个属性的值是否等于 TOPIC _PUBLISHER。如果相等,则消息被消费;如果不相等,那么消息会被忽略。
分享到:
相关推荐
JMS之ActiveMQ入门案例java工程代码,包括点对点消息模式,发布者--订阅者消息模式。
AcitveMQ消息队列
NULL 博文链接:https://flyer2010.iteye.com/blog/662047
android开发,订阅activemq,然后向activemq发送消息,也可收消息,完整代码,可运行
KahaDB存储 KahaDB他是默认的持久化策略,所有消息都会顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对...
ActiveMQ使用的一个小demo,助你快速入门。
本文档总结了Activemq的所有特性,包括整体架构、消息传输原理、部署、消息存储和通讯机制,文字结合图示,读了这个文档,会对Acitvemq有个非常透彻的了解
AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的。
ActiveMQ小心的,帮助你了解 acitvemq基本设置等,很好的。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
2、本机需要安装acitvemq,安装过程参考[https://blog.csdn.net/weixin_41806489/article/details/104997519] (https://www.csdn.net/). 注意: 一般用python是通过stomp协议进行通讯 操作过程 1、生产方 prublishi....
AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的。Producer客户端使用来发送消息的,Consumer客户端...
一个订阅通道,支持多个客户端监听,当某个客户端掉线后,再上线的时候可以收到它没有接收到的消息。