基于AWS技术实现发布/订阅服务

2013 年 8 月 20 日

AWS 提供两种服务——Amazon 简单通知服务(Simple Notification Service)和 Amazon 简单队列服务(Simple Queue Service),两者结合起来可以为完整的发布 / 订阅服务提供支撑。

现有的 AWS 功能

Amazon简单通知服务(Amazon SNS)是一个Web 服务,能让应用、最终用户和设备立即从云端发送和接收通知。简化的SNS 架构如下图所示(图1):

(点击查看大图)

图1:Amazon SNS 的基础架构

多个发布应用和多个订阅应用可以将SNS 主题作为中介互相通讯。这样实现的优点是发布者和订阅者不需要知道对方,因此,应用可以完全动态地进行集成。SNS 支持用多种传输协议传递通知,包括HTTP、HTTPS、Email、SMS 和Amazon 简单队列(Simple Queue)。

Amazon简单队列服务(Amazon SQS)提供可靠、可伸缩的托管队列,用来存储计算机之间传输的消息。使用Amazon SQS,你可以在执行不同任务的应用分布式组件之间移动数据,而不会丢失消息,也不必要求每个组件始终都是可用的。SQS 和SNS 结合起来会带来两个额外的优势——解除时间上的耦合度,根据消费应用特定的情况提供负载均衡——这是SNS 无法单独提供的。要做到第二个附加优势,需要同一个应用的多个实例从同一个队列里读取消息。下图展示了SNS 和SQS 结合的总体架构(图2)。其中的一个订阅应用显示为负载均衡的。

(点击查看大图)

图2:结合SNS 和SQS

这个实现的主要缺点是,发布者和订阅者需要明确统一SNS 主题的名称。此外,如果一个特定的消费者想从多个主题获取信息,那他需要把队列注册到多个主题上。

期望中的发布/ 订阅实现

这个问题的典型解决方案是采用基于树的主题组织,大部分发布/ 订阅引擎都是这样实现的。OASIS 规范的 Web Services Topics 1.3 概述了这种组织的主要原则。

这个规范将主题定义为:

“……主题是一组通知的组织和分类方式。主题机制为订阅者推断出感兴趣的通知提供了便捷的方式……发布者可以将通知发布和一或多个主题关联起来。当订阅者创建订阅的时候,可以提供一个主题的过滤器表达式,将订阅和一或多个主题关联起来……每个主题都可以有零或多个子主题,子主题本身也可以进一步包含子主题。没有 **“父亲”的主题叫根主题 **。特定的根主题和它所有的后代会形成一个层次结构(称为主题树)。”

下面是手机销售的一个主题树例子(图 3)。

图 3:主题树示例

主题树的根表示销售。销售可以按区域细分(在我们的例子中有北美、欧洲和亚太地区)。特定区域的销售还可以按照手机类型进一步细分,依此类推。

在发布 / 订阅系统中,这样的结构之所以重要是因为树反映了数据的组织。如果消费者对北美的智能手机销售感兴趣,他可以监听这个特定的主题。如果他对北美所有的销售都感兴趣,那他就可以监听北美的主题,从子主题获取所有的通知。

当然,这种方法并不能解决所有的问题。比如说,如果消费者想监听所有智能手机销售的事件,他就需要明确订阅所有地区的智能手机销售事件。这种情况通常是主题树设计的问题。树的设计基于信息的组织和典型的使用模式。在某些情况下,会设计多个主题来满足不同的内部需求(参见 Web Services Topics 1.3 里的主题命名空间)。发布 / 订阅架构的另一个重要特性就是基于内容的消息过滤

“在基于内容的系统中,如果消息的属性或内容与订阅者定义的约束相匹配,消息就只会传递给这个订阅者。订阅者负责消息的分类。”

换句话说,订阅者在这种情况下可以使用正则表达式列表,明确指定他们感兴趣的消息内容。

把这种过滤和结构化的主题结构结合起来,可以创建出非常灵活和强大的发布 / 订阅实现。

我们将在本文中展示如何用 AWS 组件轻松构建这类系统。

发布 / 订阅架构建议

建议给大家的架构如下图所示(图 4)。在这个架构中,发布 / 订阅服务器的实现是一个 Tomcat 容器里运行的 Web 应用。我们还充分利用了 AWS 的弹性负载均衡器(Elastic Load Balancer),它可以根据当前的负载动态扩展或缩减发布/ 订阅服务器集群的大小。此外,架构还用关系型数据服务(Relational Data Service)存储当前的配置,以便动态新增发布/ 订阅实例。为了提高整体性能,我们在内存里保留了当前的拓扑结构,尽量减少数据库访问的次数。这样的话,实际的消息路由会非常迅速。这个解决方案需要一种机制,能在拓扑结构发生变化的时候去通知所有的服务器(因为任何服务器都能处理负载均衡器)。Amazon SNS 能轻而易举地做到这一点。最后,我们用Amazon SQS 将通知分发给消费者。需要注意的是,一个消费者可以监听多个队列。

(点击查看大图)

图4:整体架构建议

发布/ 订阅服务器

这个实现的核心是一个自定义的发布/ 订阅服务器。服务器实现包括三个主要的层——持久化、域和服务。

持久化

服务器持久化层采用 JPA 2.0 实现,定义了三个主要的实体——主题、订阅和语义过滤器。

主题实体(清单 1)描述了特定主题要存储的相关信息,包括主题 ID(数据库的内部 ID)、主题名称(标识主题的字符串)、一个布尔变量(定义该主题是否是个根主题)、到父主题和孩子主题的引用(以便对主题层次结构进行遍历),以及与给定主题关联的订阅列表。

复制代码
<span>@Entity</span>
<span>@NamedQueries</span>({
    <span>@NamedQuery</span>(name=<span>"Topic.RootTopics"</span>,
                    query=<span>"SELECT t FROM Topic t where t.root='true'"</span>),
  <span>  @NamedQuery</span>(name=<span>"Topic.AllTopics"</span>,
                     query=<span>"SELECT t FROM Topic t"</span>)
})
<span>@Table</span>(name = <span>"Topic"</span>)
<span>public class</span> Topic {
<span>@Id @GeneratedValue</span>(strategy=GenerationType.<span>IDENTITY</span>)
<span>private long</span> <span>id</span>;    <span>// 自动生成的 ID</span>
<span>@Column</span>(name = <span>"name"</span>,nullable = false, length = 32)
<span>private</span> String <span>name</span>;                 <span>// 主题名称 </span>
  
@Column(name = <span>"root"</span>,nullable = false)
<span>private</span> Boolean <span>root</span> = false;         <span>// 根主题标识 </span>   
<span>@ManyToOne</span>(fetch=FetchType.<span>LAZY</span>)
<span>@JoinColumn</span>(name=<span>"TOPIC_ID"</span>)
<span>private</span> Topic <span>parent</span>;
<span>@OneToMany</span>(mappedBy=<span>"parent"</span>,cascade=CascadeType.<span>ALL</span>,orphanRemoval=<span>true</span>)
<span>private</span> List<Topic> <span>children</span>;
<span>@OneToMany</span>(mappedBy=<span>"topic"</span>,cascade=CascadeType.<span>ALL</span>,orphanRemoval=<span>true</span>)
<span>private</span> List<Subscription> <span>subscriptions</span>;
………………………………………………………………………………………………

清单 1:主题实体

我们定义了两个命名的查询,用来访问主题:RootTopics 获取从根开始的主题结构,AllTopics 获取所有现有的主题。

这个实体提供了一个完整的主题定义,也可以支持多个主题树(而不是实现示例的一部分)。

订阅实体(清单 2)描述了订阅相关的信息,包括订阅 ID(数据库的内部 ID)、队列名称(SQS 队列的 ARN,ARN 即 Amazon Resource Name)、对订阅关联主题的引用,还有一个语义过滤器列表。只有所有的过滤器都接受消息(见下文),通知才会分发给给定的队列(客户端)。如果通知不包含语义过滤器,那来自于关联主题的所有消息都会直接传递给队列。

复制代码
<span>@Entity</span>
<span>@NamedQueries</span>({
<span>@NamedQuery</span>(name=<span>"Subscription.AllSubscriptions"</span>,
                    query=<span>"SELECT s FROM Subscription s"</span>)
})
<span>@Table</span>(name = <span>"Subscription"</span>)
<span>public class</span> Subscription {
<span>@Id @GeneratedValue</span>(strategy=GenerationType.<span>IDENTITY</span>)
<span>private long</span> <span>id</span>;    <span>// 自动生成的 ID</span>
<span>@Column</span>(name = <span>"queue"</span>,nullable = <span>false</span>, length = 128)
<span>private</span> String <span>queue</span>;
 
<span>@ManyToOne</span>(fetch=FetchType.<span>LAZY</span>)
<span>@JoinColumn</span>(name=<span>"TOPIC_ID"</span>)
<span>private</span> Topic <span>topic</span>;
    
<span>@OneToMany</span>(mappedBy=<span>"subscription"</span>,
cascade=CascadeType.<span>ALL</span>,orphanRemoval=<span>true</span>)
<span>private</span> List<SemanticFilter> <span>filters</span>;
……………………………………………………………………………………

清单 2:订阅实体

我们还定义了一个命名的查询,获得所有存在的订阅。

最后,语义过滤器实体(清单 3)描述了特定语义过滤器的信息,包括语义过滤器 ID(数据库的内部 ID)、该语义过滤器测试的属性名称、使用的正则表达式,以及对语义过滤器关联订阅的引用。

复制代码
<span>@Entity</span>
<span>@NamedQueries</span>({
<span>@NamedQuery</span>(name=<span>"SemanticFilter.AllSemanticFilters"</span>,
                    query=<span>"SELECT sf FROM SemanticFilter sf"</span>)
})
<span>@Table</span>(name = <span>"Filter"</span>)
<span>public class</span> SemanticFilter {
<span>@Id @GeneratedValue</span>(strategy=GenerationType.<span>IDENTITY</span>)
<span>private long</span> <span>id</span>;    <span>// 自动生成的 ID</span>
    
<span>@Column</span>(name = <span>"attribute"</span>,nullable = <span>false</span>, length = 32)
<span>private</span> String <span>attribute</span>;                 <span>// 属性名称 </span>
<span>@Column</span>(name = <span>"filter"</span>,nullable = false, length = 128)
<span>private</span> String <span>filter</span>;                <span> // 正则表达式过滤器 </span>
<span>@ManyToOne</span>(fetch=FetchType.<span>LAZY</span>)
<span>@JoinColumn</span>(name=<span>"SUBSCRIPTION_ID"</span>)
<span>private</span> Subscription <span>subscription</span>;
……………………………………………………………………

清单 3:语义过滤器实体

我们一样定义一个命名的查询,用来获取所有现有的语义过滤器。

除了实体,持久化层还包含一个持久化管理类,负责:

管理数据库访问和事务

从数据库读取、写入对象

对域对象(见下文)和持久化实体进行相互转换

发送拓扑结构变化的通知

域模型

域模型对象的主要职责是支持服务操作,包括数据的订阅和发布,并把通知真正发布到订阅的队列上。在这个简单的实现里,域模型和持久化模型是合在一起的,但为了阐述得更清楚,我们分开介绍。这两层的数据模型是一样的,但域对象会多一些明确支持发布 / 订阅实现的方法。

过滤器处理的实现(清单 4)利用了 Java String 里对正则表达式处理的内置支持

复制代码
<span>public boolean</span> <span>accept</span>(String value){
       <span>if</span>(value == <span>null</span>)
             <span>return false</span>;
       <span>return</span> value.matches(<span>_pattern</span>);
}

清单 4:过滤器处理方法

发布实现(清单 5)是订阅类的一个方法。请注意,这个方法对语义过滤器进行了或操作。如果给定的客户端能有多个订阅,或者对订阅实现进行扩展、让它支持 Boolean 函数,那就可以突破这个限制了。

复制代码
<span>public void</span> <span>publish</span>(Map<String, String> attributes, String message){
    <span>if</span>((<span>_filters</span> <span>!= null</span>) && (<span>_filters</span>.size() > 0)){
        <span>for</span>(DomainSemanticFilter f : <span>_filters</span>){
            String av = attributes.get(f.getField());
            <span>if</span>(av == <span>null</span>)
                <span>return</span>;
            <span>if</span>(!f.accept(av))
                <span>return</span>;
        }
    }
    SQSPublisher.getPublisher().sendMessage(<span>_queue</span>, message);
}

清单 5:发布实现

这个实现利用了基于现有 AWS Java API 的 SQSPublisher 类(清单 6)。

复制代码
import java.io.IOException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.DeleteQueueRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;
public class SQSPublisher {
 private static SQSPublisher _publisher;
  
 private AmazonSQSClient _sqs;    
    
 private SQSPublisher()throws IOException {
         AWSCredentials credentials = new PropertiesCredentials(
                this.getClass().getClassLoader().
getResourceAsStream("AwsCredentials.properties"));
         _sqs = new AmazonSQSClient(credentials);
}
 public String createQueue(String name){
         CreateQueueRequest request = new CreateQueueRequest(name);
          return _sqs.createQueue(request).getQueueUrl();
 }
 public void sendMessage(String queueURL, String message){
         SendMessageRequest request = new SendMessageRequest(queueURL,
message);
          _sqs.sendMessage(request);
 }
public void deleteQueue(String queueURL){
         DeleteQueueRequest request = new DeleteQueueRequest(queueURL);
          _sqs.deleteQueue(request);
 }
 public static synchronized SQSPublisher getPublisher(){
       if(_publisher == null)
             try {
                 _publisher = new SQSPublisher();
             }catch (IOException e) {
                 e.printStackTrace();
             }
         return _publisher;
 }
}

清单 6:SQS 发布者

订阅者可以利用这个类的其他方法创建 / 销毁 SQS 队列。

除了 SQS 队列,我们的实现还利用 SNS 进行数据库变化的同步。与 SNS 的交互由 SNSPubSub 类实现(清单 7),这个实现也利用了 AWS SNS Java API。

复制代码
import java.io.IOException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sns.model.UnsubscribeRequest;
public class SNSPubSub {
  private static SNSPubSub _topicPublisher;
  private static String _topicARN;
  private static String _endpoint;
    
  private AmazonSNSClient _sns;
  private String _protocol = "http";
  private String _subscriptionARN;
    
  private SNSPubSub()throws IOException {
         AWSCredentials credentials = new PropertiesCredentials(
                this.getClass().getClassLoader().
getResourceAsStream("AwsCredentials.properties"));
         _sns = new AmazonSNSClient(credentials);
  }
  public void publish(String message){
         PublishRequest request = new PublishRequest(_topicARN, message);
         _sns.publish(request);
  }
  
  <span>public void</span> subscribe(){
         SubscribeRequest request = <span>new</span> SubscribeRequest
(<span>_topicARN, _protocol, _endpoint</span>);
         <span>_sns</span>.subscribe(request);
  }
    
  <span>public void</span> <span>confirmSubscription</span>(String token){
         ConfirmSubscriptionRequest request = <span>new<br></br></span> ConfirmSubscriptionRequest(<span>_topicARN</span>, token);
         ConfirmSubscriptionResult result = <span>_sns<br></br></span>.confirmSubscription(request);
         <span>_subscriptionARN</span> = result.getSubscriptionArn();
  }
    
 <span> public void</span> unSubscribe(){
         <span>if</span>(<span>_</span><strong><span>subscribed</span></strong>){
             UnsubscribeRequest request = <span>new</span> UnsubscribeRequest(<span>_subscriptionARN</span>);
            <span> _sns<br></br></span>.unsubscribe(request);
         }
  }
    
  public static void configureSNS(String topicARN, String endpoint){
         _topicARN = topicARN;
       _endpoint = endpoint;
  }
      
  public static synchronized SNSPubSub getSNS(){
         if(_topicPublisher == null){
             try{
                _topicPublisher = new SNSPubSub();
             }
             catch(Exception e){
                e.printStackTrace();
             }
         }
         return _topicPublisher;
  }
}

清单 7:SNS Pub/Sub

使用 SNS

使用 SNS 的时候要谨记:订阅主题并不意味着你已经准备好监听主题。SNS 订阅的过程包含两个步骤。向 SNS 发送订阅请求时,SNS 返回的响应表明确认订阅的必要性。这正是清单 8 既有 subscribe 方法又有 confirmSubscription 方法的原因。

复制代码
<xsd:complextype name="NotificationType">
<xsd:sequence>
<xsd:element name="Type" type="xsd:string" />
<xsd:element name="MessageId" type="xsd:string" />
<xsd:element name="Token" type="xsd:string" minoccurs="0" />
<xsd:element name="TopicArn" type="xsd:string" />
<xsd:element name="Message" type="xsd:string" />
<xsd:element name="SubscribeURL" type="xsd:string" minoccurs="0" />
<xsd:element name="Timestamp" type="xsd:string" />
<xsd:element name="SignatureVersion" type="xsd:string" />
<xsd:element name="Signature" type="xsd:string" />
<xsd:element name="SigningCertURL" type="xsd:string" />
<xsd:element name="UnsubscribeURL" type="xsd:string" minoccurs="0" />
</xsd:sequence>
</xsd:complextype>

上面的 Schema 描述了两种消息类型——确认请求和实际的通知。两种类型通过 Type 元素进行区分。如果元素值是“SubscriptionConfirmation”,那它就是订阅确认的请求,如果是“Notification”,就表明是个真正的通知。

主题类实现了两个方法(清单 8),以便支持发布。

复制代码
<span>public void</span> publish(Map<String, String> attributes, String message){
    
    <span>if</span>(<span>_subscriptions</span> == <span>null</span>)
        <span>return</span>;
    <span>for</span>(DomainSubscription ds : <span>_subscriptions</span>)
        ds.publish(attributes, message);
}
    
<span>public void</span> <span>processPublications</span>(List<DomainTopic> tList, StringTokenizer st) <span>throws</span> PublicationException{
    
    tList.add(<span>this</span>);
    <span>if</span>(!st.hasMoreTokens())
        <span>return</span>;
    String topic = st.nextToken();
    <span>for</span>(DomainTopic dt : <span>_children</span>){
        <span>if</span>(topic.equalsIgnoreCase(dt.getName())){
            dt.<span>processPublications</span>(tList, st);
            <span>return</span>;
        }
    }
    <span>throw new</span> PublicationException(<span>"Subtopic "</span> + topic + <span>" is not found in topic "</span> + <span>_name</span>);
}

清单 8:主题对发布的支持

processPublications 方法创建了一个主题列表,这些主题与给定的消息相关联。这个方法有一个标记过的主题树字符串,如果标记和主题名称相对应,就会把当前的主题添加到列表中。主题的 publish 方法维护一个消息属性的映射,对主题相关的每个订阅来说,publish 方法还会尝试着去发布一条消息。

上面的方法都由 Domain 管理器类的 publish 方法调用(清单 9)。这个方法首先标记主题字符串,然后用 processPublications 方法创建一个订阅者感兴趣的主题列表。列表一旦被创建好,就会构建一个消息属性的映射(我们假设是一个 XML 消息),并把这个映射发布给列表里的所有主题。

复制代码
<span>public void</span> <span>publish</span> (String topic, String message){
      StringTokenizer st = <span>new</span> StringTokenizer(topic, <span>"."</span>);
      List<DomainTopic> topics = <span>new</span> LinkedList<Domaintopic>();
      DomainTopic root = PersistenceManager.<strong>getPersistenceManager</strong>().getRoot();
      <span>try</span> {
             <span>if</span>(!st.hasMoreTokens())
                 <span>return</span>;
             String t = st.nextToken();
             <span>if</span>(!t.equalsIgnoreCase(root.getName()))
                 <span>throw new</span> PublicationException(<span>"Unrecognized subtopic name "</span> + topic);
             root.processPublications(topics, st);
      }<span>catch</span> (PublicationException e) {
             e.printStackTrace();
             <span>return</span>;
      }
      MessageType msg = <span>null</span>;
      <span>try</span> {
             JAXBElement<MessageType> msgEl = (JAXBElement<MessageType>)
                 <span>_unmarshaller</span>.unmarshal(new ByteArrayInputStream(message.getBytes()));
             msg = msgEl.getValue();
      } <span>catch</span> (JAXBException e) {
             e.printStackTrace();
             <span>return</span>;
      }
      Map<String, String> attributes = <span>new</span> HashMap<String, String>();
      MessageEnvelopeType envelope = msg.getEnvelope();
      <span>if</span>(envelope != <span>null</span>){
             <span>for</span>(MessageAttributeType attribute : envelope.getAttribute()){
                 attributes.put(attribute.getName(), attribute.getValue());
             }
      }
      <span>for</span>(DomainTopic t : topics)
             t.publish(attributes, message);
}

清单 9:发布方法实现

服务模型

我们用一组 REST 服务对发布 / 订阅功能进行访问(清单 10)。

复制代码
@Path(<span>"/"</span>)
<span>public class</span> PubSubServiceImplementation {
  <span>// 功能方法 </span>
  @POST
  @Path(<span>"publish"</span>)
  @Consumes(<span>"application/text"</span>)
  <span>public void</span> publish (@QueryParam(<span>"topic"</span>)String topic, String message) <span>throws</span> PublicationException{
         DomainManager.<strong>getDomainManager</strong>().publish(topic, message);
  }
  @<span>GET</span>
  @Path(<span>"publish"</span>)
  <span>public void</span> publishGet (@QueryParam(<span>"topic"</span>)String topic, @QueryParam(<span>"message"</span>)String message)  <span>throws</span>
PublicationException{
         DomainManager.<strong>getDomainManager</strong>().publish(topic, message);
  }
  @POST
  @Path(<span>"synch"</span>)
  @Consumes(<span>"text/plain"</span>)
  <span>public void</span> getSynchNotification (Object message){
         PersistenceManager.<strong>setUpdated</strong>();
  }
  <span>// 配置方法 </span>
  @<span>GET</span>
  @Path(<span>"root"</span>)
  @Produces(<span>"application/json"</span>)
  <span>public</span> TopicType getRoot()<span>throws</span> PublicationException {
         <span>return</span> DomainManager.<strong>getDomainManager</strong>().getRoot();
  }
  @<span>GET</span>
  @Path(<span>"filters"</span>)
  @Produces(<span>"application/json"</span>)
  <span>public</span> FiltersType getFilters() <span>throws</span> PublicationException {
         <span>return</span> DomainManager.<strong>getDomainManager</strong>().getFilters();
  }
  @POST
  @Path(<span>"filter"</span>)
  @Consumes(<span>"application/json"</span>)
  <span>public</span> <span>long</span> addFilter(FilterType filter) <span>throws</span> PublicationException {
         <span>return</span> DomainManager.<strong>getDomainManager</strong>().addFilter(filter);
  }
  @DELETE
  @Path(<span>"filter/{id}"</span>)
  <span>public void</span> deleteFilter(@PathParam(<span>"id"</span>)<span>long</span> id) <span>throws</span> PublicationException {
         DomainManager.<strong>getDomainManager</strong>().removeFilter(id);
  }
  @<span>GET</span>
  @Path(<span>"subscriptions"</span>)
  @Produces(<span>"application/json"</span>)
  <span>public</span> SubscriptionsType getSubscriptions() <span>throws</span> PublicationException {
         <span>return</span> DomainManager.<strong>getDomainManager</strong>().getSubscriptions();
  }
  @POST
  @Path(<span>"subscription"</span>)
  @Consumes(<span>"application/json"</span>)
  <span>public long</span> addSubscription(SubscriptionType s) <span>throws</span> PublicationException {
         <span>return</span> DomainManager.<strong>getDomainManager</strong>().addSubscription(s, <span>null</span>);
  }
  @DELETE
  @Path(<span>"subscription/{id}"</span>)
  <span>public void</span> deleteSubscription(@PathParam(<span>"id"</span>)<span>long</span> id) <span>throws</span> PublicationException {
         DomainManager.<strong>getDomainManager</strong>().removeSubscription(id);
  }
  @POST
  @Path(<span>"subscriptionFilters/{sid}"</span>)
  @Consumes(<span>"application/json"</span>)
  <span>public long</span> assignFilersToSubscription(@PathParam(<span>"sid"</span>)<span>long</span> sid, IDsType ids)<span>throws</span> PublicationException{
         <span>return</span> DomainManager.<strong>getDomainManager</strong>().assignFilersToSubscription(sid, ids);
  }    
  @POST
  @Path(<span>"topic"</span>)
  @Consumes(<span>"application/json"</span>)
  <span>public long</span> addTopic(TopicType t) <span>throws</span> PublicationException {
         <span>return</span> DomainManager.<strong>getDomainManager</strong>().addTopic(t, <span>null</span>);
  }
  @DELETE
  @Path(<span>"topic/{id}"</span>)
  <span>public void</span> deleteTopic(@PathParam(<span>"id"</span>)<span>long</span> id) <span>throws</span> PublicationException {
         DomainManager.<strong>getDomainManager</strong>().removeTopic(id);
  }
  @POST
  @Path(<span>"topicsubscription/{tid}"</span>)
  @Consumes(<span>"application/json"</span>)
  <span>public void</span> assignTopicHierarchy(@PathParam("tid")<span>long</span> tid, IDsType ids) <span>throws</span> PublicationException{
         DomainManager.<strong>getDomainManager</strong>().assignTopicHierarchy(tid, ids);
  }
  @POST
  @Path(<span>"topicsubscription/{tid}"</span>)
  @Consumes(<span>"application/json"</span>)
  <span>public long</span> assignTopicSubscriptions(@PathParam(<span>"tid"</span>)<span>long</span> tid, IDsType ids)<span>throws</span> PublicationException{
         <span>return</span> DomainManager.<strong>getDomainManager</strong>().assignTopicSubscriptions(tid, ids);
  }

清单 10:发布 / 订阅服务

这些服务的使用者有消息发布者(publish 方法)、服务订阅者(创建 / 删除语义过滤器,订阅,还有订阅和主题订阅相关的过滤器)、内部的发布 / 订阅实现(获取同步的服务)和管理应用。

结论

这个实现虽然简单,但创建了一个非常强大、可扩展的发布 / 订阅实现,同时利用了很多现有的 AWS 功能和少量的 Java 定制代码。另外它还充分利用了现有 AWS 部署功能对负载均衡和容错的支持。

作者简介

Boris Lublinsky 博士是 Nokia 的主要架构师,参与大数据、SOA、BPM 和中间件实现的相关工作。Boris 去 Nokia 前是 Herzum 软件的主要架构师,负责为客户设计大型、可伸缩的 SOA 系统;在此之前,他是 CNA 保险的企业架构师,参与 CNA 集成和 SOA 策略的设计及实现,构建应用框架,实现面向服务的架构。Boris 在企业技术架构和软件工程方面有二十五年多的经验。他是 OASIS SOA RM 委员会的活跃成员,和他人一起编著了《Applied SOA: Service-Oriented Architecture and Design Strategies》一书,另外他还写了很多关于架构、编程、大数据、SOA 和 BPM 的文章。

查看英文原文:基于 AWS 技术实现发布 / 订阅服务

2013 年 8 月 20 日 07:187352
用户头像

发布了 151 篇内容, 共 52.2 次阅读, 收获喜欢 9 次。

关注

欲了解 AWS 的更多信息,请访问【AWS 技术专区】

评论

发布
暂无评论
发现更多内容

喝完可乐桶后程序员回归本源,开源Spring基础内容

小Q

Java spring 学习 源码 面试

保障系统稳定高可用的方案

天天向上

极客大学架构师训练营

第二周 框架设计 作业一 「架构师训练营 3 期」

feiyun123

极客大学架构师训练营 框架设计

Spring视图解析流程

无用且垂死的星辰

架构师训练营第二周作业

J

极客大学架构师训练营

框架设计作业

cc

架构师系列之8:python网站压测工具

桃花原记

DeFi交易所系统APP开发|DeFi交易所软件开发

开發I852946OIIO

系统开发

吴桐:2021年中国区块链产业发展的六大趋势

CECBC区块链专委会

区块链 新基建

搞懂这篇文章,关于IO复用的问题就信手拈来了

程序员小灰

Linux 后台开发 io epoll Linux服务器开发

我第一次,厚着脸皮晒生活:)

清菡

生活

依赖倒置原则以及接口隔离方式实现接口设计

我们新四军不拿群众一针一线

Windows下常用软件配置

jiangling500

windows 软件配置

那些年,支撑尾款人们熬夜的AI

脑极体

Week 11 work

黄立

Spring 源码学习 06:AnnotatedBeanDefinitionReader

程序员小航

Java spring 源码 源码阅读

区块链如何解决互联网为基础的广告困境?

CECBC区块链专委会

区块链 互联网广告

Defi挖矿软件系统开发|Defi挖矿APP开发

开發I852946OIIO

系统开发

深入了解Linux共享内存及函数详解(含编程示例)

ShenDu_Linux

Linux 程序员 内存 进程

Redis系统学习之redis内存模型

linux亦有归途

数据库 redis C/C++

第二周 框架设计 学习总结

feiyun123

极客大学架构师训练营 框架设计

离开

成周

vue高级进阶系列——用typescript玩转vue和vuex

徐小夕

Java vue.js Vue 前端

架构师训练营第二周课后作业

万有引力

架构师训练营第十一周

我是谁

极客大学架构师训练营

系统安全与高可用

天天向上

架构师训练营第 11 周学习总结

netspecial

极客大学架构师训练营

几个大厂的研发类面试题你知道多少?(C/C++工程师方向)

linux大本营

c++ Linux 后台开发 架构师

我膨胀了,测试必要商城小程序,用了3种方式!:)

清菡

App

Defi系统APP开发|Defi软件开发

开發I852946OIIO

系统开发

智慧公安大数据分析平台开发,警务通APP系统开发

WX13823153201

基于AWS技术实现发布/订阅服务-InfoQ