Jan 23, 2016

Designing Messaging for Scalable Distributed Systems - Part 2

Recently I had few discussions on messaging. Thought it would be an interesting follow up to explore some additional considerations that are important but typically don't surface in early phases of messaging system design.

The focus of this post is on following aspects -
  1. Which scenarios in system design are better suited for async messaging?
  2. What are the issues to consider during implementation of distributed messaging functionalities?
  3. How can we make the distributed messaging solution easier to monitor, debug & support?
So which scenarios suite better for async messaging?

Decoupling - 
This probably is the most obvious one - the sender and receiver components can be responsive and not block on each other. Similarly sender and receiver components do not have to be running at same time (time decoupling). Finally sender and receiver do not have to run on same machine (space decoupling). Most distributed systems can benefit with having these attributes. Messaging is one good way to incorporate these attributes.

Load Balancing - 
Load balancing is a key piece in providing high availability and scalability for distributed applications. Typically the load balancing functionality is provided either via hardware load balancers or via dispatchers (i.e., software load balancers).

The above is a good solution but they are some caveats though...
  1. First, the load on the given system might not be a synchronous traffic like HTTP. 
  2. Also the traffic may not be of request-response type like HTTP traffic. 
  3. Another caveat...Say our use case is supporting video uploads by user followed by processing of the uploaded videos. The receiver here needs good amount of time. Do we block the user?
  4. Another caveat....SPOFs. Our load balancer could be a single point of failure.
Most messaging systems provide round-robin capability to process messages in the queues. This makes messaging systems a good tool to use as load balancers and addresses the caveats mentioned above.



Load Leveling - 
If the system traffic pattern is sudden bursts of activity by senders, then asynchronous message queues would be a good design to consider. Having large number of consumers to handle the burst of activity could overwhelm the system. On other hand, the queues will act as a buffer enabling consumers to drain the queues at their own pace.

Deferred Processing -
If the system requirements permit, message queues provide simple way to defer processing until off-peak hours. Basically when the senders send the messages, the system store them in the queue. Then in off-peak hours, the consumers pick these messages and processes them. The off-peak hours could be identified in various ways like via configuration, via statistical analysis of systems resource usage etc.

Cross Platform Integrations -
If the system involves multiple platforms & languages, then message queues and topics are a fairly good option to consider. Alternate choices like REST, RPC etc certainly viable and probably more suitable in some scenarios. But on other hand if the system is already using messaging and other considerations like above are in play then messaging based solution is more attractive.

Extensibility -
Messaging based designs are typically a good choice when it comes to extensibility. Say your system processes alerts to send email and snmp notifications. We go ahead and implement it with notification handlers as threads. Sounds good.

Now tomorrow you have a new requirement i.e., support logging of alerts to audit trail database. We can certainly go back and change existing code base and release as patch. A better alternative is to just model this whole thing as pub-sub messaging and register additional subscribers as needed.

Asynchronous Workflows -
If the system involves complex workflows and they can be executed asynchronously then messaging is a good option to consider. The individual steps in the workflow could be done by senders & receivers with coordination via messages.

The message headers can carry the workflow request id to tie the steps in workflow. Similarly carry correlation ids to match request & responses. Finally if the steps are designed properly without dependencies then one can build a streaming workflow pipeline where individual steps gets executed in parallel.

What are the issues to consider during implementation of messaging?

Message Grouping -
If there is a dependency between messages, then it is best to group them together as one message. Reason - When we have multiple consumers to receive/retrieve the message, there is usually no guarantee as to which consumer will process which message.

Message Ordering -
Another issue to think about. The order of the messages may not be guaranteed. Even if we can get ordering at the transport layer, there is no guarantee that the ordering could be preserved at the application layer level. Good to check if the requirement can be addressed via priority queues.

Idempotency -
Many brokers provide at-least-once guarantee. That means we will receive duplicate messages. There are two ways to handle duplicate messages.
  1. One option is to have duplicate detection mechanism on receiver side. 
  2. Another way to solve is to have idempotent message processing i.e., if a duplicate message is processed, the system state remains unchanged. 

Poison Messages -
These are those messages that cannot be handled by consumer for a variety of reasons. They are poison because they wreak havoc on your consumers and break the message flow.

Say your receiver receives a message but cannot handle that message because it is ill formatted etc. So your receiver fails throwing an exception. The message is returned to the queue and next receiver gets it. Same story with this receiver and so on with other receivers as well. Your messaging system is no longer functioning.

So it is important to detect poison messages and discard them. One way to detect is to track how many times a message is delivered and have a cutoff threshold. After the threshold is crossed, you just throw the message out.

Message Expiration -
Some times senders might send commands that are applicable only for limited time. Good to keep this in mind when processing the messages on receiver side.

Message Scheduling -
Sometimes a sender might want a command/message to be processed only at a certain time. Good to make sure the receiver does not gets its hand on that message till the scheduled time. Check if the underlying message broker provides this capability.

Note: The above is obviously not a complete list but certainly fall under most common issues list.

On making the messaging solution easier to monitor, debug & support...

So, how can we effectively manage a messaging system that is distributed?
Having a distributed & loosely coupled system is great. But it also creates some challenges like -
  1. What if we want to check whether all the components are running correctly? 
  2. Are the messages flowing through or is any component backing up? 
  3. What is the throughput? etc.

Also some times we need to make adjustments to the system while it is running. The adjustment could be updating the configuration of a component or enabling a subscriber etc. A good way to do these management activities is to leverage the messaging system itself. For example, publishing a control message and let components make necessary changes in response to this control message.

While the above sounds good, there are couple caveats though. For example, what if the message queues have large backlog and our control message is way behind in the queue? One way to solve this is to utilize priority messages.

We might still run into problem. For example, what if the messages queues are backed up and broker is not accepting any more new messages?. Our control message could be a purge queue message. So a better option is to have a separate channel for control messages.

In short, we make each component in the system connect to two channels -
  1. Regular Messages Channel
  2. Control Message Channel

We can then utilize the Control Channel to send
  1. Configuration Messages. Examples: Timeouts, Property files, Membership Info etc.
  2. Heartbeat Messages. Example: For liveness etc.
  3. Stats Messages. Examples: Average throughput, Processing Latencies etc.
  4. Live Console. For example, to aggregate above stuff and display in console etc.

How can we effectively investigate in a messaging system that is distributed?


The ability to do validations on messages that hop between components is a fairly useful capability to have for debugging. But on other hand, we may not want to do these validations all the time as it will slow down the system. Another scenario is having ability to log messages to a trace file for troubleshooting.

So how do we do the above? - Basically we create a context-based router that can be controlled via Control Message Channel. The idea being this router routes incoming messages through additional channels when it receives a control message. Other times, it routes directly to the destination channel.



Also there is nothing stopping us from having multiple context-based routers where all the routers can be controlled with same control message. Best to not get carried away though :-)

How do we inspect messages in queues?
This is not an issue if we are using publish-subscribe channels. On other hand, it will be a issue if we are doing event driven processing and leveraging competing consumers for load balancing. Reason - Using pub-sub model means all consumers process same message.

Another alternative is to make the sender publish the messages for inspection on a separate channel. This is also not an attractive option as that requires lot of changes to the code.

Another alternative is to utilize the peek method that allows a component to inspect messages without consuming it. Well, this approach also has a problem. Once the consumer receives the message, you can no longer inspect the message.

So how do we solve it? The solution is basically to add recipient list to the channel so that all incoming messages on that channel are also posted on to secondary channel(s).



How can we effectively trace the flow of messages in a loosely coupled messaging system?
Tracking the flow of messages through the system seems simple on the surface. It is not. We cannot use the message-id  as the message-id changes each time a message is re-published.

Similarly using an upper layer id like request-id will not help. The request-id does not tell us which components have processed this message. The solution is to attach a Message History with the message. The message history is basically a list of all component ids that the message has passed through so far. Every component before publishing the message, add its unique id to the message history. We store the message history as part of the message header. 

Finally another reason we may need to capture message history in a message is to avoid infinite loops. For example, say whenever there is a database update, we publish the message and all subscribers update their database entries.

You can see how the above can create an infinite loop. By having each subscriber check if they are already in the message history and process the message only if they are NOT in message history, we can avoid creating an infinite loop.

Long post but I hope it is useful. Please feel free to let know your comments.

Quote of the Day...