And now my third blog post on replication – I hope you liked the last two. This will be the last, at least until some new replication changes are made (writing these posts has given me some great ideas). In this post I am going to talk about Diffusion’s implementation of replication using Hazelcast as a provider. This does not affect how you use replication (except for design decisions) and is abstracted away from the user but it’s nonethless interesting. Enjoy!
So, Hazelcast. Hazelcast is an in-memory datagrid written in Java. It provides a replicated and distributed datagrid that provides failure tolerance. We initially started looking at Hazelcast because it exposes a JCache API. It was thought we could swap Hazelcast out for another JCache implementor making it easy to integrate with other providers. In the end we created a very Hazelcast specific implementation.
Hazelcast stores distributed objects, these objects can be accessed across the entire cluster and do not need to be localised on a single node. Each object has a unique name, different nodes that use the same name access the same object and the object is created implicitly when accessed.
Hazelcast stores data in partitions. By default there are 271 partitions, distributed across the cluster. A partition is localised to one node and is backed up on another node giving failure tolerance. Each object name or other key maps onto one partition. This acts as a distributed hash table, each node can look up the partition for a key and the node the partition is on. This allows every node to know the location of a value referred to be a key. An object can exist in a single partition, identified by the object name (like an AtomicLong) or across multiple partitions, accessed through multiple keys (like a Map).
The serialisation built into Java is very generic. It includes a description of the class along with the instance data, this creates a large binary representation. By implementing custom serialisers, performance can be improved and the amount of the data that must be sent to Hazelcast reduced. Hazelcast provides its own framework for custom serialisation and Diffusion also uses a custom serialisation framework (well two but we’ll only talk about one). It’s a common problem that has been solved many times. So we need a way to reuse the Diffusion serialisers inside Hazelcast to avoid duplicating work.
Java vs custom serialisation
The Diffusion serialisation framework is very similar to Hazelcast’s StreamSerializer. The main differences are that the Hazelcast serialisation uses integers to lookup the serialiser and Diffusion uses a Class object; Hazelcast uses its own ObjectDataInput and ObjectDataOutput objects to read and write and Diffusion uses the familiar InputStream and OutputStream; and the methods provided by ObjectDataInput and ObjectDataOutput to read/write primitives are provided to Diffusion as static methods on a utility class. So both Hazelcast and Diffusion have very similar serialisation, it should be simple to convert from one to the other.
Diffusion uses Hazelcast’s ByteArraySerializer and standard library’s byte array streams to wrap its serialisers in a way that Hazelcast can understand. A ByteArraySerialiser has been implemented that wraps a byte array in a ByteArrayInputStream, unpacks a byte array from a ByteArrayOutputStream and delegates to the Diffusion serialiser. It also maps the serialiser to an integer. This implementation is all that is needed to expose any of Diffusion’s serialisers to Hazelcast.
As mentioned Hazelcast uses integers to identify the custom serialisers. This could cause a problem if you need to register your own custom serialisers and you don’t know which integers are free. So there needs to be some way of making sure that the custom serialisers don’t replace each other. You can change the offset used by Diffusion’s custom serialisers by setting the system property diffusion.hazelcast.serialiser.offset to where you want Diffusion to start numbering them from.
Session replication implementation
Since 5.1, Diffusion uses the Hazelcast SPI to define its own data structures and operations. It’s the SPI that really allows Hazelcast to surpass a basic JCache implementation. It’s the difference between using a map and defining a data structure that uses a map internally. It allows single operations to be applied to complex data structures in an ordered and atomic way.
The sessions are stored in a custom Hazelcast distributed object, each node uses this object to get information about all sessions. Like a map, the sessions are spread across all partitions, according to session ID, distributing the load across all the nodes – and of course backed up.
Custom distributed objects are accessed through a proxy that communicates with a service instance. The service should implement interfaces like RemoteService and ManagedService and is identified by name. A service instance is hosted on a node and is responsible for managing all data, in all partitions on that node related to the service. They are started and stopped with the Hazelcast node. Proxies are obtained by calling the getDistributedObject method with the name of the service and the name of the object. The proxy instance is created by the service and should present an API in the form of a interface (extending DistributedObject) and be implemented by a class extending AbstractDistributedObject.
The proxy is passed the internals of Hazelcast needed to invoke custom operations. The operations should be created when methods are called on the proxy. The OperationService is used to invoke the operation with the service name and partition. This is then serialised and sent to the node the partition is on. It then runs on the node and can access the service instance and the methods it exposes. Operations can return a result to the invoker allowing value retrieval. This is the basis of Hazelcast’s internode communication.
Custom objects in Hazelcast
When a new session connects a method is called on a proxy which creates an operation to add it. This operation is sent and applied to the partition the session ID maps to. A session data object is created inside the service running on that node to contain all the immutable information. Then every time the session makes a new topic selection another operation is used to add the selection to a list and update a modification version. The modification to the list and version happen atomically. If the provided Hazelcast objects were used this could not be done without performing multiple operations which could not be synchronised over, introducing race conditions. This would affect the correctness of the implementation as the modification version is used to invalidate information that is cached when failing over back which could revert some topic selections.
Each interaction with a session, creating one, recovering selections, etc. use a different, single operation targeting a partition. If a server fails any sessions that do not failover after a timeout need to be removed. Unlike other operations which are applied to specific sessions the operation to clean up after a session is applied to all partitions of the cluster individually.
It’s also necessary to handle Hazelcast triggered events like repartitioning. Whenever a new node joins a Hazelcast cluster the partition to node mappings are recalculated and partitions are moved between nodes. A service needs to implement the MigrationAwareService to allow it to handle migration events. A key part of this is providing an operation that can apply the migrated data to the target node. This operation should set up the data of the partition on the target node, effectively merging in the new partition.
Topic replication implementation
Topic replication works slightly differently to session replication. Instead of using custom objects it uses the objects provided by Hazelcast. These are implemented in the same way as custom objects using proxies and services. Unlike session replication, where knowledge about a session needs to be somewhere in the cluster, topic replication requires all nodes to know about topics.
On each Diffusion server a listener is registered on the topic tree to receive notifications of created and removed topics. When a topic is added to the tree Diffusion attempts to bind the topic path it belongs. It does this by trying to update a distributed map of topic paths to server. It will also store the TopicDetails in a different distributed map of topic names to details. If the binding succeeds or is already bound locally the server notifies the cluster of updates, otherwise the server listens for them. The other nodes are listening to the distributed map of details for new entries, these are used by the nodes to create a copy of the topic. At least this is true of stateless topics, the creation of topics that use topic data is delayed until they have current content. When the new entries specifies publishing topic data it requests the current load message. This request is responded to by the node that currently has the binding for the topic path. When the response is received the topic is created with the latest value.
On the server with the binding, topic publications are listened for through the multiplexer as broadcast message events. These messages are then broadcast to all the Hazelcast nodes using Hazelcast Topics. These messages are listened for but not stored. When a node receives a message it looks up the topic and either publishes the message on it or updates the topic data.
When a Diffusion server joins a cluster it attempts to acquire a lock within a short timeout. If the lock is acquired the server becomes the master, this is only important for failover. When a node fails Hazelcast will trigger a cluster membership listener on each node. If the failed server was the master Hazelcast will release the lock and all the servers will compete to acquire the lock and become the new master. The node that is now the master will take over any bindings of the failed node and become responsible for sending updates for topics on the bound paths.
Integrating with non-Diffusion nodes
Diffusion sets up Hazelcast with custom serialisers and data structures. If you attempt to add a node that is not a Diffusion server to the cluster when Hazelcast tries to move partitions to it, the new node won’t know what to do with the Diffusion data. Since any node can host any partition, the node configuration needs to be homogeneous.. The serialisers, data structures and services need to be configured on every node you add to the cluster. Diffusion configures these programmatically. To setup your nodes to work with Diffusion, get in touch and we’ll talk you through exactly what is needed.
Fun! I hope you have gotten an idea of how much easier using Hazelcast can make creating a distributed, replicated datagrid. The value of a tool is always what it can add and what it can take away. Hazelcast does not add value by moving data, Diffusion can do that. It adds value by giving partitioning, backups, cluster discovery and a clean abstraction around interacting with distributed objects. It adds value by taking away the need to redo all this inside of Diffusion. The Diffusion server takes as much advantage as it can from Hazelcast and there is nothing stopping you from doing the same. Check out our developer pages for more on Diffusion or download a trial free.
The Diffusion® Intelligent Event-Data Platform makes it easy to consume, enrich and deliver event-data in real-time across all network conditions. Push Technology pioneered and is the sole provider of real-time delta data streaming™ technology that powers mission-critical business applications worldwide. Leading brands use Push Technology to bring innovative products to market faster by reducing the software development efforts using the low-code features. The Diffusion® Intelligent Event-Data Platform is available on-premise, in-the-cloud, or in a hybrid configuration. Learn how Push Technology can reduce infrastructure costs, and increase speed, efficiency, and reliability, of your web, mobile, and IoT application.