Enterprise Integration PatternsMessaging Patterns
HOME    PATTERNS    RAMBLINGS    ARTICLES    TALKS    DOWNLOAD    BOOKS    CONTACT

Publish-Subscribe ChannelPublish-Subscribe Channel

Messaging Patterns

Previous Previous   Next Next

An application is using Messaging to announce events.

How can the sender broadcast an event to all interested receivers?

Send the event on a Publish-Subscribe Channel, which delivers a copy of a particular event to each receiver.

A Publish-Subscribe Channel works like this: It has one input channel that splits into multiple output channels, one for each subscriber. When an event is published into the channel, the Publish-Subscribe Channel delivers a copy of the message to each of the output channels. Each output channel has only one subscriber, which is only allowed to consume a message once. In this way, each subscriber only gets the message once and consumed copies disappear from their channels.

...

Example: Google Cloud Pub/SubNEW

Google Cloud Pub/Sub offers both Competing Consumers and Publish-Subscribe Channel semantics, managed through topics (Publish-Subcribe) and subscriptions (Competing Consumers) as illustrated in this diagram:


Google Cloud Pub-Sub Concept Overview

In this example, both Subscriber Y and Subscriber Z each receive a copy of Message 3 as they are subscribing to the same Topic C, but through separate subscriptions. Google Cloud Pub/Sub does not support wildcard subscriptions.

Google provides client API libraries that make coding against the cloud service relatively simple. The client code has to authenticate first and create topics before messages can be published. Subscriptions are created on the fly.

A stand-alone application can authenticate using private keys:

Pubsub pubsub;
    
void createClient(String private_key_file, String email) throws IOException, GeneralSecurityException {
  HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
  GoogleCredential credential = new GoogleCredential.Builder()
      .setTransport(transport)
      .setJsonFactory(JSON_FACTORY)
      .setServiceAccountScopes(PubsubScopes.all())
      .setServiceAccountId(email)
      .setServiceAccountPrivateKeyFromP12File(new File(private_key_file))
      .build();
    pubsub = new Pubsub.Builder(transport, JSON_FACTORY, credential)
      .setApplicationName("eaipubsub")
      .build();
}

using the PubSub instance, you can create a new topic if it doesn't already exist:

Topic createTopic(String topicName) throws IOException {
  String topic = getTopic(topicName); // adds project name and resource type
  Pubsub.Projects.Topics topics = pubsub.projects().topics();
  ListTopicsResponse list = topics.list(project).execute();
  if (list.getTopics() == null || !list.getTopics().contains(new Topic().setName(topic))) {
      return topics.create(topic, new Topic()).execute();
  } else {
      return new Topic().setName(topic);
  }
}

Now the code is ready to publish a message to the topic, returning the published message IDs:

List<String> publishMessage(String topicName, String data) throws IOException {
  List<PubsubMessage> messages = Lists.newArrayList();
  messages.add(new PubsubMessage().encodeData(data.getBytes("UTF-8")));
  PublishRequest publishRequest = new PublishRequest().setMessages(messages);
  PublishResponse publishResponse = pubsub.projects().topics()
      .publish(getTopic(topicName), publishRequest)
      .execute();
  return publishResponse.getMessageIds();
}

Subscriptions are identified by a name and a topic so multiple subscribers can compete for messages off the same subscription or create individual subscriptions for the same topic. If a subscription already exists, the underlying REST API throws an exception, which we can simply ignore.

Subscription subscribeTopic(String subscriptionName, String topicName) throws IOException {
  String sub = getSubscription(subscriptionName);  // adds project name and resource type
  Subscription subscription = new Subscription()
    .setName(sub)
    .setAckDeadlineSeconds(15)
    .setTopic(getTopic(topicName));
  try {
    return pubsub.projects().subscriptions().create(sub, subscription).execute();
  } catch (GoogleJsonResponseException e) {
    if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
        return subscription;
    } else {
        throw e;
    }
  }
}

Now everything is in place to pull messages off the subscription. The code is a bit verbose because it has to navigate collections that allow receiving multiple messages at once. It also has to convert the payload back into a String for this simple demo implementation:

List<String> pullMessage(Subscription subscription, int maxMessages, boolean doAck) throws IOException {
  PullRequest pullRequest = new PullRequest()
          .setReturnImmediately(true)
          .setMaxMessages(maxMessages);
  PullResponse response = pubsub.projects().subscriptions().pull(subscription.getName(), pullRequest).execute();
  List<ReceivedMessage> messages = response.getReceivedMessages();
  List<String> ackIds = Lists.newArrayList();
  List<String> data = Lists.newArrayList();
  if (messages != null) {
      for (ReceivedMessage receivedMessage : messages) {
          PubsubMessage message = receivedMessage.getMessage();
          if (message != null) {
              byte[] bytes = message.decodeData();
              if (bytes != null) {
                  data.add(new String(bytes, "UTF-8"));
              }
          }
          ackIds.add(receivedMessage.getAckId());
      }
      if (doAck) {
          AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds);
          pubsub.projects().subscriptions().acknowledge(subscription.getName(), ackRequest).execute();
      }
  }
  return data;
}

Find the source for this code snippet on Github.

Related patterns: Competing Consumers, Durable Subscriber, Event Message, Message, Message Channel, Message Store, Messaging, JMS Publish/Subscribe Example, Point-to-Point Channel, Request-Reply


Want to keep up-to-date? Follow My Blog.
Want to read more in depth? Check out My Articles.
Want to see me live? See where I am speaking next.

Enterprise Integration Patterns Find the full description of this pattern in:
Enterprise Integration Patterns
Gregor Hohpe and Bobby Woolf
ISBN 0321200683
650 pages
Addison-Wesley

From Enterprise Integration to Enterprise Transformation:

My new book describes how architects can play a critical role in IT transformation by applying their technical, communication, and organizational skills with 37 episodes from large-scale enterprise IT.

DRM-free eBook on Leanpub.com

Print book on Amazon.com

Creative Commons Attribution License Parts of this page are made available under the Creative Commons Attribution license. You can reuse the pattern icon, the pattern name, the problem and solution statements (in bold), and the sketch under this license. Other portions of the text, such as text chapters or the full pattern text, are protected by copyright.


Table of Contents
Preface
Introduction
Solving Integration Problems using Patterns
Integration Styles
File Transfer
Shared Database
Remote Procedure Invocation
Messaging
Messaging Systems
Message Channel
Message
Pipes and Filters
Message Router
Message Translator
Message Endpoint
Messaging Channels
Point-to-Point Channel
Publish-Subscribe Channel
Datatype Channel
Invalid Message Channel
Dead Letter Channel
Guaranteed Delivery
Channel Adapter
Messaging Bridge
Message Bus
Message Construction
Command Message
Document Message
Event Message
Request-Reply
Return Address
Correlation Identifier
Message Sequence
Message Expiration
Format Indicator
Interlude: Simple Messaging
JMS Request/Reply Example
.NET Request/Reply Example
JMS Publish/Subscribe Example
Message Routing
Content-Based Router
Message Filter
Dynamic Router
Recipient List
Splitter
Aggregator
Resequencer
Composed Msg. Processor
Scatter-Gather
Routing Slip
Process Manager
Message Broker
Message Transformation
Envelope Wrapper
Content Enricher
Content Filter
Claim Check
Normalizer
Canonical Data Model
Interlude: Composed Messaging
Synchronous (Web Services)
Asynchronous (MSMQ)
Asynchronous (TIBCO)
Messaging Endpoints
Messaging Gateway
Messaging Mapper
Transactional Client
Polling Consumer
Event-Driven Consumer
Competing Consumers
Message Dispatcher
Selective Consumer
Durable Subscriber
Idempotent Receiver
Service Activator
System Management
Control Bus
Detour
Wire Tap
Message History
Message Store
Smart Proxy
Test Message
Channel Purger
Interlude: Systems Management Example
Instrumenting Loan Broker
Integration Patterns in Practice
Case Study: Bond Trading System
Concluding Remarks
Emerging Standards
Appendices
Bibliography
Revision History