Jan 11, 2016

Designing Messaging for Scalable Distributed Systems

The cloud has changed considerably the scale of distributed systems. As the size of the systems grow, it becomes increasingly difficult to design them and keep them running. To avoid those difficulties, most large scale architectures use loosely coupled technologies. 

The vehicle that is often used in this journey to paradise (or inferno) of scaling the system is the message bus. If implemented properly, I think messaging is a highly valuable element in the architecture for near infinite scale. 


If implemented haphazardly...


You get the point...:-)


Introduction 
Lets start with a typical use case -  Say we have a large retailer who needs a system that transfers product updates from a central point to hundreds of stores. Similarly the retailer needs the system to collect sales data from all the stores and send them back to central office for processing. It is must that the messages are not lost due to node or communication failures. Also this system should be highly available and scalable.

If you notice, a big part of above distributed system involves reliably producing and consuming data at scaleActually this is quite common with most large scale distributed systems. So decoupling the production and consumption of data in above scenario helps considerably in designing a good system. 


Why? - Decoupling removes the dependencies between components that are distributed. Also decoupling reduces the need for coordination and synchronization between components. 
So our starting point is decoupling...

Decoupling in Space, Time & Synchronization

Space Decoupling 
Coupling in space happens if the processes in a distributed system make assumptions about locality for interaction. Assumptions could be like availability of a service locally or at a fixed IP address etc. If a design has spatial coupling, that system will be rigid and break easily whenever a processes moves or changes. 
Space decoupling means processes do not have to know each other to interact. 
Time Decoupling 
Time decoupling is NOT same as asynchronous messaging. Coupling in time happens anytime we expect both sender and receiver processes are expected to be available at same time. It does not matter whether the interaction is synchronous or asynchronous.
Time decoupling means processes do not have to actively interact at same time.
Synchronization Decoupling
In large scale distributed systems, multiple services interact with each other. Coupling in synchronization happens when one of the interacting parties waits for other party to complete processing its activity.
Synchronization decoupling means each process in interaction can proceed independently.
Communication Paradigms
Now that we defined what decoupling means, next step is to evaluate the decoupling abilities of communication paradigms we have at our disposal. Following are some common interaction paradigms for building distributed systems - 
  1. Message Passing
  2. RPC (remote procedure calls)
  3. Notifications/Observers
  4. Distributed Shared Memory
  5. Message Queues
  6. Publish-Subscribe


Message Passing:

This model in some ways is the grand-daddy of all other above communication models. In this model, the components communicate with each other by simply sending and receiving messages.

Typically messaging passing is asynchronous for the producer and synchronous for the consumer. The producer knows consumer (i.e., space coupling). Similarly the producer and consumer both have to be active at same time (i.e., time coupling).


Remote Procedure Calls (RPC):

This is one of the most common communication model in distributed systems. They are popular for a good reason - ease of use, performance etc. RPC examples - RMI, DCOM, Thrift etc. 

RPC calls involve holding reference to a remote object and invoking methods on them (i.e., space coupling). Similarly the remote object should be available (i.e., time coupling) and execute the method synchronously (i.e., synchronization coupling).


We do have some alternatives that try to break this strong coupling.  Examples -

  • Fire-n-Forget : In this variant, the sender invokes the method and does not receive success or failure. This enables us to achieve decoupling on synchronization dimension.
  • Futures : In this model, the sender invokes the method and continues with its processing. The sender can request for the return values of the remote invocation later via a handle.

Observers/Notifications:

In this model, our sender invokes an api on the receiver and also registers a callback. The receiver sends a reply later using this callback. Sometimes this model is extended where a receiver sends multiple replies to the sender using the callback.

This is a fairly common communication model and is used in many contexts. Examples - Job status updates, updates for ensuring consistency of web caches etc.


In this model the sender knows the receiver (i.e., space coupling). Similarly both sender and receiver has to be available at same time (i.e., time coupling). What the callback provides is decoupling in synchronization. This model can become challenging though as the system grows in size.


Distributed Shared Memory (DSM):

Think of In-Memory Data Grids or Distributed Shared Caches. Basically in this model all hosts share common space that is spread across multiple nodes. The synchronization and communication between the nodes or processes takes place through operations on shared data.  

This interaction model provides time-based and space-based decoupling. For example, 

  • To create data, sender does not have to know who the receiver is or how it will be used in future (i.e., space decoupling). 
  • Similarly to consume data the sender does not have to be available at same time (i.e., time decoupling).

This model allows us to use  both one-to-one  and one-to-N methods of data delivery. 
One constraint in this model though is the producers typically insert data into shared space asynchronously while consumers read/pull data synchronously (i.e., synchronization coupling).

To circumvent this what we can do is to use asynchronous notifications mechanism which is available in most IMDG libraries. This allows one to achieve all 3 decouplings i.e., space, time and synchronization decoupling.


Message Queues:

In this model, all the hosts share a queue for communication. The queue is like a global shared space but with some differences For example, message queues typically provide additional functionality like transactions, timing and ordering etc.

Similar to DSMs, both producers and consumers are decoupled in space and time. Queues do not provide synchronization decoupling though as consumers have to pull in messages one after another. 


We can certainly use callbacks and provide asynchronous message delivery but it becomes too complicated as the size of the system grows. Reasons being additional interactions needed to maintain transactions, timing and ordering guarantees etc. 


Also the above does not make sense. The reason we would use queues over say publish-subscribe model is for ordering, timing etc. So having async call backs on the queue for consumption defeats the purpose of using a queue.


Publish-Subscribe Model

In recent years the publish-subscribe model has become pretty popular and I think for a good reason. The strength of this interaction style is it provides full decoupling i.e., decoupling in time, space & synchronization between publishers and subscribers. 






The most widely used publish-subscribe models are
  1. Topic-based pub-sub model
  2. Content-based pub-sub model
  3. Type-based pub-sub model

Topic based publish-subscribe:
This model is based on the notion of topics or subjects. In this model, senders publish events to topics and subscribers subscribe to individual topics. The topics are typically identified by keywords. 



In some ways topics are like groups in context of group communication. So subscribing to a topic is equivalent to becoming a member of a group. Publishing to a topic is equivalent to broadcasting to a group. 

There are couple differences though -
  1. Generally groups are used in context of maintaining strong consistency between replicas etc whereas topics are used in context of distributed interactions. 
  2. Generally groups typically use flat addressing whereas topics use hierarchical addressingThe advantage with hierarchical addressing is a subscriber can subscribe to a node in the hierarchy and that implicitly involves automatic subscription to all sub topics of that node. 
Most of the current generation messaging systems supports wildcards. This allows subscribers to register interest based on a pattern to an entire sub-tree of topics.

Content based publish-subscribe:
Given the capabilities (hierarchies, wild cards etc) available in topic based model, why do we even need to consider content-based publish-subscribe? 

One disadvantage of topic-based publish-subscribe is the subscription scheme is static and limited in expressiveness. The content-based publish-subscribe allows a subscriber to register interest based on the properties of the event. In other words, the messages are not classified according to some pre-defined external criteria. Instead the metadata of the event defines the criteria.

In this model, typically the consumers subscribe to messages selectively by specifying filters using some sort of subscription language specific to that broker. The filters define constraints often like name:value pairs along with comparison operators (like <, =, >,...) etc.

Another area this model is useful is to subscribe for event correlations i.e., subscribers register interest to a logical combination of events. The most common ways of representing the content based patterns is via Strings or Template Objects.

Finally the content-based model enables one to have fine granularity in their subscription scheme. To achieve the same with a topic-based model could mean either the subscriber filtering out irrelevant events or system having to provide a huge number of topics. Both work arounds are inefficient.

 If content-based model is so good why use topic-based model?

A disadvantage with content-based model is the the system can quickly become complex without coherent structure and consistency. Structure and consistency are important especially when you have multiple geo separated teams working on a distributed system. 


Another disadvantage is topic-based scheme is simple and can be implemented very efficiently whereas content-based schemes have additional overhead.

So it is best to use the model (i.e., topic-based or content-based) that makes most sense for the underlying system. Also one can use content-based model within a topic-based model selectively.

Type-based publish-subscribe:
Events on a topic typically have same structure. So the idea in this model is to use type of the object as the scheme instead of string based topic names. 

The main advantage of this model is tighter integration of the language with the message broker. Using type enables one to do type safety checks during compile-time checks and casting of types during run time.

Message Broker Architectures
Now that we covered decoupling, communication paradigms and deep dive into publish-subscriber mode, our next stop is message brokers.

The primary goal of any publish-subscribe system is to facilitate exchange of events between producers and consumers in an asynchronous manner. There are two ways this asynchrony can be implemented.

Centralized architecture
In this model the producer sends messages to a broker. The broker in turn forwards the messages to consumers on demand. Ex: RabbitMQ





Typically applications based on this model have strong requirements in terms of reliability, data consistency or transactional support. In centralized architectures, the message broker provides these capabilities. This is also one reason messaging brokers are often built on a centralized database or a distributed commit log. 

There are few advantages with centralized architecture model. 
  • First, the senders and receivers do not need to know about each other. All they need to know is the address of broker(s). The broker then takes care of routing etc. Spatial decoupling?
  • Second, the message sender and receiver do not have to be available at the same time. The broker can take care of persisting the messages and forwarding them at a later time. Time decoupling?
  • Third, the broker model is to some extent resistant to the producers and consumers implementation. If a particular producer or consumer is buggy and prone to failure, the messages that are already in the broker are retained.
We also have couple drawbacks with centralized architecture model - 
  • First, it is a more chatty model  i.e., publisher -> broker and broker -> subscriber.
  • Second, the broker can become the bottle-neck as all message must pass thru broker.
  • Third, the broker is a single point of failure unless clustered.

Decentralized Architecture
In this model, the smart communication requirements are built directly into the producer and consumer processes. The producers and consumers do not need an entity (i.e., message broker) in the middle to provide asynchrony, reliability, consistency etc requirements.

These architectures don't have a single point of failure and are generally well suited for fast and efficient delivery of transient data. On other hand, upgrades and maintenance of the producers and consumers especially when they are spread out will be quite a challenge with this model. 

Message Broker Networks
The main design goal of message broker networks is to provide availability, scalability and fault tolerance. A typical broker network consists of many brokers placed in some topology and they work by routing messages to other brokers hiding the fact they are many brokers in the network.

For example, in RabbitMQ various facilities exist to create a highly scalable and distributed broker network using Clustering, Federation and Shovels. Below is one good slide deck I came across on this topic.




Similarly Apache Kafka is another interesting broker for building a high throughput distributed message broker network. Kafka is based on the concept of distributed partitioned commit logs and utilizes ZooKeeper for coordination primitives. I found the below slide deck interesting on this.




Quality of Service (QoS) Levels
Whichever messaging broker we choose for our distributed application, we need to understand deeper the messaging service levels and guarantees provided by that broker. Following are some common services we find in most message brokers -  
  1. persistence, 
  2. priorities, 
  3. transactional guarantees and 
  4. reliability guarantees.

Persistence: 
In RPC like systems, the method invocation by definition is a transient event. The lifetime of remote invocation is short and if the invoker does not get a reply then the request would get reissued. 

In publish-subscribe or queuing systems, the scenario is bit different. The messages could be processed hours after the message is sent. 
Persistence is generally present in most publish/subscribe systems that have a centralized architecture. The message brokers store messages until consumers are able to process them. 

Following image shows persistence model in RabbitMQ.



Decentralized architectures do not generally offer persistence since messages are sent directly from producer to consumer unless the producer does the persistence. 

Priorities:
Like persistence, priorities is a quality of service offered by some messaging systems. Some times we might have a situation where you want to process some types of messages first (ex: failure notifications) even if there are other messages already in the waiting list.

Typically the priorities apply only to messages that are in transit i.e., messages that are not yet processed.  Runtime execution priorities are handled by the application. It means, we cannot guarantee that processing of high priority messages is completed first even if the underlying queue is a FIFO. It depends on the speed of the subscribers. 
So best to consider the priority guarantees by message broker as a best-effort quality of service (unlike for persistence).
Transactions:
In messaging, transactions are used generally to group processing of multiple messages as atomic units i.e., all or none. In other words, either all messages are sent (received) or none.

The range of transaction mechanisms provided varies by broker. Generally brokers that use database provide fairly rich transaction mechanisms. Ex: IBM MQSeries. JMS provides transactional mechanism although it is within context of a session.

Reliability:
Reliability is a fairly important feature of distributed messaging. It is often necessary that we will need strong guarantees about the reliable delivery of the messages to subscribers. 

Centralized publish-subscribe systems typically use reliable point-to-point channels to communicate with subscribers and keep the copies of the messages on stable storage. This way even if the message broker is down, the messages are delivered eventually when the broker is up again.

The reliability guarantees provided by brokers are typically 
  • At-most-once
  • At-least-once and 
  • Exactly-once.
At-most-once is the fastest mode. In this mode the sender does not wait for an acknowledgement from broker. Similarly the broker does not wait for an acknowledgement from subscriber. The downside is if there is a connection or host failure, the message could be lost.

At-least-once means the sender will send a message and if there is no acknowledgement then will keep retrying till an acknowledgement is received. It is the responsibility of the receiver to handle duplicate messages. Typical approaches to handle duplicates are to either make the operations idempotent so that processing duplicate messages is not an issue (or) have a duplicate detection logic in the subscriber to avoid processing duplicates.

Exactly-once means the broker will try to guarantee that message is delivered exactly once. This is the slowest of all three types of reliability QoS levels. While many brokers claim they support "exactly-once", I think that is an illusion. So one is better of going with "At-least-once" and handle duplicates at application level.

Message Types

Quite often we can summarize the messages exchanged in a distributed application into one of the following 4 categories:
  • Command Messages
  • Document Messages
  • Event Messages
  • Sensor Data Messages

Command Messages
As the name implies, this message is used to execute an action or procedure on the receiving application (or) a series of applications. 

Typically the instructions are included either through header and attributes or as message body. It is also a good idea to include additional attributes in the message like command-id, request-id, correlation-id, sender, timestamp etc. These attributes will help later to correlate the responses. 

Usually command messages are sent on a point-to-point channels and often require that it is invoked only once. Good to handle duplicate detection on the subscriber side. Otherwise we need to make sure the command execution leads to an idempotent operation. 

Another attribute that is useful to include in the command messages header is the response-to field. This will enable the command receiver to post response on the topic or routing-key identified by the response-to field.

Document Messages
This type of messages are typically used to transfer data between componentsThe data could be a piece of configuration, serialized object etc. The important part of a document message is the content part. 

The content of the message is typically in a structured format (ex: JSON, XML etc). Usually the document messages are sent on a publish-subscribe channels. In a request-reply scenario, the reply could be a document message where the result value is the document.

Event Messages
Many times applications use event notifications to coordinate actions and like to use messaging for communicating those events. Three important characteristics of event messages are -
  1. The event messages do not require a response
  2. The event messages usually can also be modeled as time series data.
  3. Typically event messages are sent on publish-subscribe channels.

Sensor Messages
Sensor messages category covers a broad spectrum of messages. Some important characteristics of these messages are -
  • Typically sensor messages are time series data (i.e., point-in-time values).
  • These messages usually do not require a response.
  • These messages are usually sent on publish-subscribe channels.
  • The data in these messages are typically WORM (i.e., write-once, read-many)
  • Often these messages are also fed for Analytics and Stream processing.
Examples of sensor messages include --
  • Status Messages
  • Metric Messages
  • State Messages
  • Alert Messages
  • Event Messages etc

Some Messaging Patterns...
There are a whole bunch of messaging patterns and one can find them with a quick google search. Thought I will cover some of the more common patterns -
  • Pipes-n-Filters pattern
  • Scatter-n-Gather pattern
  • Competing consumers pattern
  • Aggregator pattern

Pipes-n-Filters Pattern
Usually this pattern is used when one wants to divide a larger processing task into a sequence of smaller, independent processing steps (filters) that are connected by channels (pipes). 

Typically this pattern is useful where the processing required by the system can be broken down into discrete and independent steps. Another reason to use this pattern is if the individual steps have different scalability requirements.


Some simple examples of this pattern are
  • pipeline: metrics -> monitoring -> alerts -> notifications
  • pipeline: status -> monitoring -> alerts -> notifications
  • pipeline: metrics -> aggregations -> persistence
  • pipeline: metrics -> persistence

This pattern can improve decoupling, performance, scalability and reusability by allowing individual elements to be deployed and scaled independently. The key is to standardize on the data format each component receives and emits. 

Some advantages with pipes-n-filter pattern -
  1. Pipeline structure provides opportunities for running parallel instances of slow filters, enabling the system to spread the load and improve throughput.
  2. Another advantage is if the input and output of filters are structured as stream, then the filters in the pipeline can be executed in parallel. 
  3. Another benefit of this model is the resiliency i.e., should one of the filters become a bottleneck or no longer available, the pipeline may be able to reschedule and route it through another instance of the component. In other words, failure of a single filter does not result in failure of the entire pipeline.
Couple drawbacks -
  1. Complexity i.e., the increased flexibility and modularity introduces complexity. 
  2. Another challenge is repeated messages i.e., if a filter in pipeline posts a message to next stage in the pipeline and fails then another instance of the filter could post a message again.
In general it is a good idea to try build the filters as idempotent filters. Similarly good to make sure each filter runs in isolation and not make any assumptions on context except what is passed to it. 

Competing Consumers Pattern

This pattern is basically enabling multiple consumers process concurrently the messages in a message channel. Typically this pattern is used for improving throughput, scalability and availability of the system and to balance the workload.

Note: The order in which the processing of messages will complete is not guaranteed. So if your application requires tasks or messages to be processed sequentially then this is not suitable. 


If your system has auto-repair mechanism to automatically restart failed processes for resiliency then it may be necessary to implement the message processing as idempotent operations. Reason being same message could end up being processed multiple times.


Results handling is another area to watch out in this pattern. If the system needs results generated from these competing consumers then there should be some location where both the system and competing consumers can access. Also the system would need a way to know the processing is completed.


Scatter-Gather Pattern

Conceptually this is a fairly simple pattern. The steps are -
  1. The requester sends the request to the scatter component. 
  2. The scatter component broadcasts this request to multiple providers. 
  3. Each provider processes the request and sends a reply to the gather component.
  4. The gather component aggregates all requests and sends the aggregate to the requester.

To associate a reply message with a particular request message, the requester and provider must agree on a mechanism for request-reply correlation. So the requester labels each request with a unique identifier and sends it to the scatter component, which broadcasts the request to all providers.

Each provider receives the request, retrieves the identifier from the request, inserts it into the reply message, and sends the reply to the gather component. The gather component aggregates all replies for a specific request and sends the aggregate to the requester. 

The gather component needs to retrieve the request identifier in each reply in order to correlate the reply with the request. Also, the requester needs to retrieve the request identifier included in each aggregate, in order to correlate the aggregate with the request.

Aggregator pattern
The aggregator pattern basically receives a stream of messages and groups the messages that are correlated. Once a complete set of messages have been received then the aggregator publishes them as a single aggregated message.

Simple...isn't it? Actually it is a tricky pattern once we get into the details.When designing an aggregator, we need to pre-define following things -
  • Correlation criteria i.e., how can aggregator determine the incoming messages are correlated?
  • Completeness criteria i.e., how can aggregator determine it has complete set?
  • Aggregation criteria i.e., how should aggregator aggregate the messages?
There are a number of strategies for each of the above steps and deserves its own post. We will come back to this some other day.

Some tips...
Finally some tips I came across over time...
  1. Communicate asynchronously as much as possible. 
  2. Do not call long running processes synchronously from a subscriber.
  3. Be aware that a connection will not always be present and messages may need to be stored.
  4. If you store messages, be aware that your disk can become full. 
  5. Consider how to handle the case when a response message is not received.
  6. Ensure your message bus can scale.
  7. Standardize on the message formats. Use standard protocols for interoperability.
  8. Minimize volume of data sent across the network. Send only necessary data.
  9. Consider sending coarse messages to minimize frequency of network calls.
  10. Don't publish everything to bus i.e., avoid over crowding message bus.

Wrap up...
I hope you found above quick overview useful. Messaging in scalable distributed systems is a vast arena and above is just scratching the surface. Feel free to let me know any suggestions and corrections in the comments field.