PHP 开发者的 BlazeDS 和 JMS 指南,第一部分

阅读数:3452 2010 年 7 月 5 日

话题:JavaPHP语言 & 开发

BlazeDS 是来自 Adobe 公司的一个开源项目,它可以让您的 Flex 应用程序与数据服务进行连接。JMS(Java 消息服务)是用 Java 编写的与服务相互通信的一种方法。本文有助于您体会使用 JMS 的优点,以及在 Flex 应用程序中如何使用 BlazeDS 通过 JMS 与 Java 服务进行通信。

JMS 概述

JMS 是一个 API(应用编程接口)标准,它允许您使用 Java EE 技术发送和接收消息。在 Java 社区很多软件商都提供了 JMS 的商业和开源实现,您可以根据自己的需求自由选择软件商。

使用 JMS 的优点

使用 JMS 有几个优点,包括抽象、可靠的传递、异步消息,以及故障转移和高可用性。其中,抽象是非常重要的优点,因为 JMS 的消费者和生产者不必彼此相互依赖。无论是消费者端还是生产者端的代码都可以改变,只要 JMS 消息保持一致,两者间的连接就不会中断。

JMS 通常支持两种形式的消息——持久化的消息和非持久化的消息。非持久化的消息是在内存中进行处理的,速度很快,但可能会因系统故障而丢失消息。持久化的消息则会把消息写入磁盘,即便系统发生故障也可找回消息,但相应的处理速度较慢。不管是否使用事务处理,JMS 消息提供者都会确保所有持久化消息只被可靠传递一次。异步消息传递则提供了发送消息而无需等待响应的能力,其在 Web 应用中非常有用,当发送请求后应用程序无需阻塞或挂起等待响应完成。这种消息传递类型对于需要长时间处理的应用(如酒店预订或构建动态文档等应用)来说是一个理想选择。

只要配置正确,很多 JMS 的实现都提供了故障转移和高可用性能力。因此,如果某个正在运行 JMS 的服务器突然失效了,相同 JMS 实现的另一台服务器能够处理其负载。对于需要提供业务连续性的应用程序来说,故障转移是一个理想选择。

使用 JMS 和 BlazeDS 的优点

在使用 BlazeDS 连接 JMS 时,您的 Flex 应用程序可以获得使用 JMS 的所有好处,而过去这些好处往往只有完全在 J2EE 平台上实现的 Web 应用才能享受。

使用 BlazeDS 和 JMS 向服务发送消息,可以让您的 Flex 应用程序开发更加独立于服务。服务的 JMS 实现甚至可以在更换软件供应商,而您的 Flex 应用程序不用做任何修改。

使用 BlazeDS 和 JMS 的另一个主要优点是,如果您在使用 JMS 之前正在使用超文本传输协议(HTTP),那么您的 Flex 应用程序的代码基本不需要做什么改变,BlazeDS 会为您处理的所有细节。

理解 JMS 消息

JMS 提供了两种类型的消息传递模型:点对点(P2P)和发布 / 订阅(pub/sub)模型。简单地说,发布 / 订阅是一对多的方式广播消息,而点对点则是一对一的方式传递消息。

在 JMS 中的消息客户端被称为 JMS 客户端;而消息系统则被称为面向消息的中间件(MOM——Messaging Orientated Middleware),它也是 JMS 提供者。另外,产生消息的 JMS 客户端被称为是生产者,而接收消息的 JMS 客户端被称为消费者。

P2P 模型

P2P 消息模型允许 JMS 客户端通过名为队列(Queue)的虚拟信道发送和接收消息。JMS 队列可视为消息的容器,就像一个邮箱。JMS 发送者推送一条消息到 JMS 消息队列就如同寄件人把一封信件放到了邮箱。而后,接收者从邮箱中获得消息并对之采取某种动作。就像邮箱中的信件,使用 JMS 队列从发件人发送的消息由单个接收者读取。尽管 JMS 对于 P2P 也支持类似于发布 / 订阅模型所使用的“推”的方式,传统的 P2P 模型却是一个基于“拉”或基于“轮询”的模型,是从队列中请求消息,而不是把消息推送到客户端。

可能用到 JMS 消息和 Flex 客户端的一个实例是:从 Flex 客户端初始化一个进程,该进程需要服务层花费一些时间才能完成,比如创建一个按需定制的 PDF 文件。

发布 / 订阅模式

在发布 / 订阅模式中,生产者可以通过名为主题(Topic)的虚拟信道发送一个信息给多个消费者。主题与印刷杂志类似:订阅者向出版商注册订阅,接收指定的杂志。出版商定期向不同的订阅者发送同一杂志的副本。发布 / 订阅模型总的来说是基于推的模型,消息被广播到消费者,而不是通过请求或拉的方式。

使用 JMS 主题,您可以从服务层一次发送信息到多个 Flex 客户端。在 Flex 客户端使用 JMS 主题的一个例子是用户上线。在这个例子中,当用户登录到该系统,服务层会经由 JMS 主题发送一个消息,告诉其他所有的 Flex 客户端该用户已上线。

JMS 消息

JMS 提供一种方式来构建信息,使得主题和队列使用起来更加容易。当编写 Java 代码时,JMS API 提供的方法能以一致的方式来正确构建消息。而使用 BlazeDS 的一个好处是,您不必担心怎样创建正确的信息,BlazeDS 会辅助实现这一点的。

JMS 的实现

JMS 的实现有若干,既有开源项目也有商业项目。本文使用的 JMS 实现是 ActiveMQ,它是 Apache 软件基金会的一个开放源码的项目,以 Apache 许可证发布。ActiveMQ 除了提供一个 JMS 的实现外,还提供使用其它编程语言发送消息的功能,包括 PHP。其它的 JMS 实现还包括 IBM 的 WebSphere MQ,JBoss 的 HornetQ(注:前身是 JBoss Messaging,开放的消息队列),以及 OpenJMS。

配置 BlazeDS 使用 JMS

若要 BlazeDS 使用 JMS,其配置包括:建立正确的端点(endpoint)并在终点(destination)配置中设置 JMS 属性。本文的示例用到了一个简单的 Flex 界面、BlazeDS 库,以及 ActiveMQ。

JMSAdapter

当消息到达 MessageBroker Servlet 后,BlazeDS 使用 JMSAdapter 来完成 Flex 应用程序和 JMS 的实现之间的消息转换。JMSAdapter 在 messages-config.xml 文件中配置:

<adapters>
	<adapter-definition id="actionscript" class="flex.messaging.services.messaging.adapters.ActionScriptAdapter" default="true" />
	<adapter-definition id="jms" class="flex.messaging.services.messaging.adapters.JMSAdapter" />
</adapters>

端点的不同

一定要记住,BlazeDS 的 Java 库和 MessageBroker servlet 需要进行配置并运行于 Web 容器。由于 MessageBroker 必须处于运行状态,使用 JMS 适配器的终点(destination)仍要使用 AMF(Adobe Action Message Format)通道。在本例中使用的端点是:

<?xml version="1.0" encoding="UTF-8"?>
<services-config>
   <services>
       <service-include file-path="messaging-config.xml" />
   </services>

   <security/>

   <channels>
      <channel-definition id="my-amf" class="mx.messaging.channels.AMFChannel">
         <endpoint url="http://{server.name}:8161/blazeds/messagebroker/amf" class="flex.messaging.endpoints.AMFEndpoint"/>
      </channel-definition>

      <channel-definition id="my-streaming-amf" class="mx.messaging.channels.StreamingAMFChannel">
         <endpoint url="http://{server.name}:8161/blazeds/messagebroker/streamingamf" class="flex.messaging.endpoints.StreamingAMFEndpoint"/>
      </channel-definition>

      <channel-definition id="my-polling-amf" class="mx.messaging.channels.AMFChannel">
         <endpoint url="http://{server.name}:8161/blazeds/messagebroker/amfpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
         <properties>
            <polling-enabled>true</polling-enabled>
            <polling-interval-seconds>1</polling-interval-seconds>
         </properties>
      </channel-definition>
   </channels>
</services-config>

JMS 属性

JMS 需要一些自己的配置,比如连接工厂用于建立 JMS 连接的 JNDI(Java 命名和目录接口)。队列名或主题名也是使用 JNDI 进行解析的。JNDI 是一个 Java API,允许把资源的物理位置抽象成名字。JNDI 的常见用法是数据库命名,应用程序使用此 JNDI 名获得数据库连接。

除了连接工厂、主题或队列的 JNDI 名,JMS 变量还允许您定义用户证书、消息类型、终点类型,等等。下面展示的是在本例中使用的 JMS 属性:

<destination id="my-jms-destination">
   <properties>
      <jms>
         <destination-type>Topic</destination-type>
         <message-type>javax.jms.TextMessage</message-type>
         <connection-factory>ConnectionFactory</connection-factory>
         <destination-jndi-name>dynamicTopics/MyTopic</destination-jndi-name>
         <delivery-mode>NON_PERSISTENT</delivery-mode>
         <message-priority>DEFAULT_PRIORITY</message-priority>
         <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
         <initial-context-environment>
            <property>
               <name>Context.INITIAL_CONTEXT_FACTORY</name>
               <value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value>
            </property>
            <property>
               <name>Context.PROVIDER_URL</name>
               <value>tcp://localhost:61616</value>
            </property>
         </initial-context-environment>
      </jms>
   </properties>
   <channels>
      <channel ref="my-amf" />
   </channels>
   <adapter ref="jms" />
</destination>

您可以查看BlazeDS 文档以获得进一步的属性解释。

从 BlazeDS 发送消息

要尝试本文的例子,请在创建您的测试 Flex 项目之前,按照下面的步骤行事。下载ActiveMQ 二进制发布包并解压,您可以启动 ActiveMQ 并完成ActiveMQ 文档中的验证步骤,但在继续下一步之前请先关闭 ActiveMQ。

下载 BlazeDS 二进制发布包

首先要下载BlazeDS 二进制发布包

将 BlazeDS 发布包 blazeds.war 解压到 ActiveMQ 的 webapps 目录下。这么做可保证您运行的例子中只使用了 ActiveMQ。

在 Jetty 服务器中建立 Web 应用上下文,Jetty 是一个 Web 容器,ActiveMQ 二进制发布包自带了 Jetty。如要增加上下文,需修改 jetty.xml 文件,它位于 ActiveMQ 目录下的 conf 目录:

<webAppContext contextPath="/blazeds" resourceBase="${activemq.base}/webapps/blazeds" logUrlOnStart="true"/> 

启动 ActiveMQ,在浏览器地址栏输入http://localhost:8161/blazeds。应该可以看到 BlazeDS 的目录列表。这一步是验证你已经把 BlazeDS WAR 放在正确的位置,并配置正确。

创建一个 Flex 项目,它包含两个配置文件,配置文件位于 src/WEB-INF/flex 目录,分别为:messaging-config.xml 和 services-config.xml。使用文章前述的例子来设置 JMS 的终点,并添加 JMSAdapter。

把 Flex 应用项目中的 services-config.xml 和 messaging-config.xml 文件复制到 ActiveMQ 的 webapps/blazeds/WEB-INF/flex 目录。

发送消息(示例)

为测试从 Flex 发送的消息,本文使用了 ActiveMQ 自带的命令行例子。为从 Flex 应用程序中发送一条消息到命令行客户端,您可以在 Flex 代码中建立一个生产者(producer),然后用不同的参数运行命令行例子,并查看结果。

您需要在 Flex 应用程序中创建一个生产者来发送消息。作为一个终点,这个新生产者将使用配置在 messaging-config.xml 文件中的 my-jms-destination。下面这个例子是一个非常简单的带有消息生产者(producer)的 Flex 表单:

<?xml version="1.0" encoding="utf-8"?> 
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml" layout="absolute" initialize="consumer.subscribe()"> 
   <mx:Script> 
      <![CDATA[ 
         import mx.messaging.messages.IMessage; 
         import mx.messaging.messages.AsyncMessage; 
         import mx.messaging.events.MessageEvent; 
         private function sendMessage(value : String) : void 
         { 
            var message : IMessage = new AsyncMessage(); 
            message.body = value; 
            producer.send(message); 
          } 
      ]]> 
   </mx:Script> 
   <mx:Producer id="producer" 
      channelConnect="trace('producer connected')" 
        destination="my-jms-destination" 
        fault="trace(event.faultDetail)" 
   /> 
   <mx:VBox> 
      <mx:Form> 
         <mx:FormItem label="Message:"> 
         <mx:TextInput id="textInput" /> 
      </mx:FormItem> 
      </mx:Form> 
      <mx:Button id="sendButton" label="Send Message" click="sendMessage(textInput.text)" /> 
   </mx:VBox> 
</mx:Application> 

界面上有一个按钮 sendButton,点击此按钮会调用 sendMessage() 函数。sendMessage() 函数先创建一个 AsyncMessage,其是 IMessage 接口的一个实现,然后使用 producer 的 send() 方法发送此消息。赋值给 fault 属性的 trace() 方法会把给定的信息打印到控制台,从而帮助您调试任何可能发生的问题。

验证消息:运行 Flex 应用程序;切换到命令行,运行 ActiveMQ 自带的消费者(consumer)实例,脚本如下:

$ ant consumer -Dtopic=true -Dsubject=MyTopic -Dmax=2 

如果你输入框中输入消息,然后点击发送消息按钮,该消息会出现在消费者端的命令行上。在收到两条消息后,消费者端的命令行例子会自动退出。

接收消息(示例)

要接收 JMS 消息,您需要构建一个使用 JMS 终点的消费者(consumer)。在例子中为简单起见,消费者重用了生产者用来发送消息的终点。创建消费者的代码大致如下:

<mx:Consumer id="consumer" channelConnect="trace('consumer connected')" 
   channelFault="trace(event.faultDetail)" 
   fault="trace(event.faultDetail)" 
   destination="my-jms-destination" 
   message="messageHandler(event)" /> 
... 
<mx:TextArea id="results" width="100%" /> 

在 <mx:Script> 代码块中处理消息的函数大致如下:

private function messageHandler(event : MessageEvent) : void { 
   results.text += event.message.body + "\n"; 
} 

添加了代码后,启动 Flex 应用程序。当 Flex 应用启动后,在命令行运行生产者的例子,脚本如下:

ant producer -Dtopic=true -Dsubject=MyTopic -Dmax=5 

消息将出现在 Flex 的文本框中。

通过 REST 服务与 PHP 集成

Web 服务是集成 PHP Web 应用和 Java 的方法之一。REST Web 服务在 Java 中作为 Servlets 以及在 PHP 中作为脚本实现都比较容易。

REST 概述

REST Web 服务比 SOAP Web 服务更为简单,因为 REST Web 服务没有一个标准的消息格式(例如,单个的网页可以是一个 REST Web 服务)。URL 地址就是服务端点(endpoint),而返回的 Web 页面则是该服务的响应。

编写 REST Web 服务的一个优点是可被多种类型的客户端使用。REST Web 服务示例如下,它作为一个 Java Servlet,能被其它语言如 PHP 调用,而无需修改服务。

Java Servlet 示例

本例使用了一个简单的 Java Servlet 来作为 REST Web 服务。此 Servlet 处理 PUT 请求,从 PUT 请求的内容和各种参数中得到消息,例如:得到目标对象(JMS 终点的名字),此消息是否是一个主题, JMS 连接使用的 URL 等。

package com.example.servlets;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URLDecoder;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;

public class MessageService extends HttpServlet {
    private static final long serialVersionUID = -8129137144911681674L;

    private Destination destination;
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;

    @Override
    protected void doPut(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        Connection connection = null;            

        try {
            boolean isTopic = "true".equals(request.getParameter("topic"));
            boolean isPersistent = "true".equals(request.getParameter("persistent"));
            String subject = request.getParameter("subject");
            String url = URLDecoder.decode(request.getParameter("url"), "UTF-8");
            String messageText = "";

            BufferedReader buffer = new BufferedReader(new InputStreamReader(request.getInputStream()));

            messageText = buffer.readLine();

            System.out.println("Using URL:  <" + url + ">");
            System.out.println("Using Subject:  <" + subject + ">");
            System.out.println("Sending Message Text:  <" + messageText + ">");

            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
            connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            if (isTopic) {
                destination = session.createTopic(subject);
            } else {
                destination = session.createQueue(subject);
            }

            // Create the producer.
            MessageProducer producer = session.createProducer(destination);
            if (isPersistent) {
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            } else {
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }

            TextMessage message = session.createTextMessage(messageText);

            producer.send(message);

            System.out.println("Done.");

            // Use the ActiveMQConnection interface to dump the connection
            // stats.
            ActiveMQConnection c = (ActiveMQConnection)connection;
            c.getConnectionStats().dump(new IndentPrinter());

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (Throwable ignore) {
            }
        }

    }

}

除了用 ActiveMQ 特定类,你还可以使用 JNDI 来写代码。对于想了解使用 JNDI 来获得连接的更多信息,请参阅ActiveMQ 文档

编译后的 Servlet 类文件(比如前面的 MessageService 类)必须放在 ActiveMQ 的 webapps 目录下。要添加新目录,则需要在 conf 目录的 jetty.xml 配置文件中添加入口(与前面的步骤 4 相似)。

PHP 示例

PHP 脚本使用CURL 库(译者注:原文的 CURL 库的链接有错)调用 REST 服务,将数据发送到 REST 服务的 URL(由 Java Servlet 处理)。如果您的 PHP 版本没有包含 CURL 库,您可以使用 Ajax 来调用 REST Web 服务。在 Web 应用的实际产品中,Ajax 可能会提供更好的用户体验。  

PHP 脚本包含一个表单来提交自身。当提交表单时,该脚本从文本框取得消息,并把消息作为数据发送到 REST 服务的 URL。请看下面的例子:

<?php 
   if (isset($_POST['submit'])) { 
      $url = 'http://localhost:8161/messageservice/?url=tcp://localhost:61616&subject=MyTopic&topic=true'; 
      function doPut($ch, $a_data) { 
         curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'PUT'); 
         curl_setopt($ch, CURLOPT_HTTPHEADER, array('Content-Length: '.strlen($a_data))); 
         curl_setopt($ch, CURLOPT_POSTFIELDS, $a_data); 
         return curl_exec($ch); 
      } 
      $data = $_POST['message']; 
      $ch = curl_init(); 
      curl_setopt($ch, CURLOPT_URL, $url); 
      $response = doPut($ch, $data); 
      echo 'Message sent'; 
   } else { 
      ?> 
         <html><head><title>Post a message</title></head> 
         <body> 
         <form action="<?php echo $_SERVER['PHP_SELF'];?>" method="post"> 
         <p>
            <label for="message">Message: 
               <input type="text" id="message" name="message" /> 
            </label>
         </p> 
         <input type="submit" value="submit" name="submit" id="submit" /> 
         </form> 
         </body> 
      </html> 
      <?php 
   } 
?> 

在 URL 上传递的 JMS 服务器的 topic、subject 和 URL 信息都赋值给了 $url。

用 BlazeDS 发送 / 接收的示例

要想看 BlazeDS 发送 / 接收的实例,启动 Flex 应用程序。当应用程序打开,在不同的浏览器上打开 PHP 脚本,给消息框添加一个值,然后单击提交。消息框的值将会出现在 Flex 应用的文本框中。

小结

JMS 是一个消息传递服务,支持主题和队列两种模式以及其它的很多特征,是可靠消息传递的不错选择。 BlazeDS 可以让您只需做少量工作即可完成从 Flex 客户端向 JMS 发送消息。

PHP 应用程序可以以多种方式连接到 JMS。使用 REST Web 服务是把 PHP 和 Java 应用程序整合在一起的一种方式——REST Web 服务提供了一种相对简单的接口供客户端使用。使用 Java 实现 REST 服务,您可以利用现有的 Java 代码来发送 JMS 消息。

本系列的第 2 部分主要讲通过 JMS 整合 PHP 和 Flex 应用的其它方法:PHP / Java 桥和 STOMP。

译者注:STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为 MOM(Message Oriented Middleware,面向消息的中间件) 设计的简单文本协议。

查看英文原文:BlazeDS and JMS for PHP Developers, Part 1


译者简介:李强,计算机硕士,毕业于电子科技大学,目前在四川长虹电器股份有限公司技术中心从事研发工作,主要研究领域是 SOA、ESB、Web 服务以及分布式应用等。

感谢宋玮对本文的审校。

给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家加入到InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。