One of the key abilities in Diffusion is the ability to update the value of topics. Updating allows new values to be provided for topics. In Diffusion 6.2 a new update API has been introduced. This new API introduces new ways of coordinating multiple updaters, handling missing topics and a safer interface.
There are five main changes that are being introduced in 6.2:
- Stateless set
- Conditional updates
- Creation of missing topics through update
- Optimistic, non-exclusive update streams
The existing update API has been deprecated in favor of the new API. The functionality in the existing API that was previously deprecated has been removed. The update API hadn’t been changed for some time and still used callbacks instead of
CompleteableFutures, so this was a good opportunity for us to integrate many recent changes to Diffusion.
In Java, functionality is split up into
Feature objects. A new
TopicUpdate feature has been added to encapsulate the new update API. The
Topics feature extends this so that the new functionality can be accessed in two ways.
It is intended that all topic related functionality will be consolidated into the
Topics feature. The API has grown too fragmented and we want to make it easier to find related features. Previously updating was done using the
TopicUpdateControl feature which has now been deprecated.
As part of these changes, a static factory method for creating topic specifications has been added to the
Diffusion enum. This is a common location for several factories and reflects the fact that a
TopicSpecification needs to be created when using different features.
set method has been introduced for performing simple, value-based update operations. In its simplest form, this method is equivalent to using a non-exclusive value updater, but it supports multiple alternatives with additional functionality. This method replaces the current topic value with a new value.
The set method is accessed through the
TopicUpdate feature. It requires the path to update, the type of value and the value. The type of the topic being updated must match the type of value it is being set to. This prevents topics being given values that don’t match their type which is possible with existing APIs.
The primitive topic types, int64, double and string support being set to null but other types must be set to a value. It’s important to know that from 6.2 if a primitive topic is set to null, new subscribers will not be notified of the topic value until it changes to a non-null value. This only affects int64, double and string because those are the only topic types that use language native representations of null.
session .feature(TopicUpdate.class) .set("a/int64/topic", Long.class, 5L) .thenAccept(x -> System.out.println("Topic updated"));
The set method returns always returns a
CompletableFuture. There are several ways this can fail, such as the topic being missing or not having permission. Any failure will cause the future to be completed with an exception.
Conditional update supports coordination between sessions. It allows a constraint to be passed with a topic update. The topic update is only applied if the constraint is satisfied.
There are several different coordination patterns that this supports. Constraints can specify session locks to ensure exclusive access to a topic. Constraints can specify values to ensure an updating session knows the latest value and is not interfering with work done by other sessions.
A factory for constraints can be obtained from the
final UpdateConstraint.Factory constraints = Diffusion.updateConstraints(); final UpdateConstraint constraint = constraints.value(5L);
Constraints support simple composition. You can compose constraints if they test different things. A session lock is separate from other session locks, so many of them can be composed together. A topic can only have one value, so constraints on values can’t be composed with each other. Constraints on a topic value and session locks can be composed together.
final UpdateConstraint constraint = constraints .value(5L) .and(constraints.locked(sessionLock));
A conditional update can be applied using stateless set operations. Using a constraint on the value, you can ensure that a topic is changed from one value to another. This allows multiple sessions to cooperate on updating a topic.
session .feature(TopicUpdate.class) .set("a/int64/topic", Long.class, 6L, constraints.value(5L)) .thenAccept(x -> System.out.println("Topic incremented"));
This returns a
CompletableFuture that can fail with a
UnsatisfiedConstraintException if the constraint is not satisfied and the topic is not updated.
Constraints on values can be specified in several different ways. A value can be specified as an exact binary value, or as being absent, and the structure of JSON objects can be partially specified. You can specify the value at a certain position within a JSON value using a JSON pointer. You can also specify that a certain position is not present in the object. This is different from a position being set to null.
session .feature(TopicUpdate.class) .set( "a/json/topic", JSON.class, newValue, constraints.jsonValue().with("/id", 6192L).without("/cancelled")) .thenAccept(x -> System.out.println("Topic updated"));
It’s possible to use constraints to ensure that updates are only applied if a session lock is held. This allows sessions to obtain exclusive access to a topic. This differs from using session locks without constraints by preventing race conditions when locks are being released and acquired about the same time that updates are being applied.
final TopicUpdate update = session.feature(TopicUpdate.class); session .lock("a/lock") .thenApply(lock -> constraints.locked(lock)) .thenCompose(constraint -> update.set("a/path", Long.class, 6L, constraint)) .thenAccept(x -> System.out.println("Topic updated while lock held"));
Creation of missing topics
The new API allows topic specifications to be passed with a topic update. If no topic exists at the path you are trying to update, the specification is used to create the topic and then apply the update to it. This can be done with the stateless
session .feature(TopicUpdate.class) .addAndSet( "a/int64/topic", Diffusion.newTopicSpecification(TopicType.INT64), Long.class, 5L) .thenAccept(creationResult -> System.out.println("Topic was " + creationResult));
This returns a
CompletableFuture that completes with a
TopicCreationResult value that indicates if a new topic was created or an existing one updated.
If a topic exists but the specification it was created with differs from the one provided with the update, the update will fail and the future will be completed exceptionally with
addAndSet method requires both the modify_topic and update_topic permissions for the path for anything to happen. If you are missing the modify_topic permission and the topic exists, the topic won’t be updated.
A constraint can be passed, just like with
set, to make the topic creation and update conditional.
final TopicUpdate update = session.feature(TopicUpdate.class); session .lock("a/path") .thenApply(lock -> constraints.locked(lock)) .thenCompose(constraint -> update.addAndSet( "a/path", Diffusion.newTopicSpecification(TopicType.INT64), Long.class, 0L, constraint)) .thenAccept(creationResult -> System.out.println("Topic was " + creationResult));
Optimistic, non-exclusive update streams
Previous update APIs have provided exclusive and non-exclusive updaters. Exclusive updaters prevent other sessions updating a branch of the topic tree. Non-exclusive updaters allow other sessions to update the same topics. This means exclusive updaters can send updates as binary deltas, but non-exclusive updaters can’t. To send an update as a binary delta, the updater needs to know the last value to generate the differences to the new value.
Optimistic, non-exclusive update streams are a stateful type of updater that does not prevent other sessions from applying updates but detects when it no longer knows the last value. This allows it to send binary deltas without preventing updates from other sources. Update streams are created for specific topics instead of branches; this is needed to detect changes to the topic they are updating. An update stream is created for a specific value type and it will check that the type of the topic matches the value type.
final UpdateStream updateStream = session .feature(TopicUpdate.class) .createUpdateStream("a/int64/topic", Long.class); updateStream .set(7L) .thenAccept(x -> System.out.println("Topic updated"));
Update streams are created immediately without interacting with the server. On the first update stream operation, the stream will be validated with the server. From this point on, it will be able to detect changes to the topic it is updating.
If something else changes the topic, the update stream will be invalidated and will stop accepting new values. There can only be one valid update stream at a time. When a new update stream is validated with the server, any previous update stream for the topic is invalidated. Once a stream is invalidated, any attempt to use it results in an
Once the update stream has been validated, it can use the
get method to access the last known value. So long as the update stream has not been invalidated, this must be the current value of the topic. It can be used to help derive the next value for the topic.
updateStream .set(updateStream.get() + 1) .thenAccept(x -> System.out.println("Topic incremented"));
To give more control over the life cycle of the update stream, the
validate method validates the update stream with the server without setting a new value. Eager validation gives you control over when the server is notified of the stream and the point from when it can be invalidated.
final UpdateStream updateStream = session .feature(TopicUpdate.class) .createUpdateStream("a/int64/topic", Long.class); updateStream .validate() .thenAccept(x -> System.out.println("Stream validated."));
validate method returns a
CompletableFuture that acts in the same way as the future returned by the
Update streams can be created with a constraint. The constraint is checked when the update stream is validated: either the first call to
validate. The constraint is only checked once. If a session lock is used, the lock does not need to be held to update the topic.
final UpdateStream updateStream = session .feature(TopicUpdate.class) .createUpdateStream("a/int64/topic", Long.class, constraints.locked(lock)); updateStream.validate().get(); lock.release(); updateStream.update(7L).get();
Update streams can be created with a topic specification. This allows them to create the topic if it’s missing when the update stream is validated, either the first call to
final UpdateStream updateStream = session .feature(TopicUpdate.class) .createUpdateStream( "a/int64/topic", Diffusion.newTopicSpecification(TopicType.INT64), Long.class);
validate methods of an update stream return a future that completes with a result that indicates if the topic was created or updated. This is only relevant to update streams that are created using the method
createUpdateStream and are called with a
Update streams can be created with both a topic specification and a constraint. This combines the uses of update streams, topic creation and conditional operation.
final UpdateStream updateStream = session .feature(TopicUpdate.class) .createUpdateStream( "a/int64/topic", Diffusion.newTopicSpecification(TopicType.INT64), Long.class, constraints.locked(lock)) .validate();
The Diffusion Intelligent Data Platform manages, optimizes, and integrates data among devices, systems, and applications. Push Technology pioneered and is the sole provider of real-time delta data streaming™ technology that powers mission-critical business applications worldwide. Leading brands use Push Technology to fuel revenue growth, customer engagement, and business operations. The products, Diffusion® and Diffusion Cloud™, are available on-premise, in-the-cloud, or in a hybrid configuration, to fit the specific business and infrastructure requirements of the applications operating in today’s mobile obsessed, everything connected world. Learn how Push Technology can reduce infrastructure costs, and increase speed, efficiency, and reliability, of your web, mobile, and IoT application.