Background Image

New Fetch API in Diffusion Release 6.2

27 Nov 18

Version 6.2 of the Diffusion Intelligent Data Platform introduces a major improvement in the form of the new fetch API.

In Diffusion terms, ‘fetch’ refers to the operation that allows a client to request the current state of a topic or topics without subscribing to those topics.

The new fetch API is a powerful interface for retrieving the values and specifications of topics. The API’s additional functionality is particularly useful for paging through large result sets.

While fetch has been a feature of the previous Diffusion client APIs, the new fetch API has powerful enhancements that expand and simplify client development as follows:

  • Topic specifications can now be retrieved, not just topic values.
  • The value returned is now typed to the topic’s data type, meaning that the client does not need to know the topic type or perform data type specific conversion on the raw bytes values returned.
  • The result set can now be filtered by topic selector and topic path range. This means that the caller can know how large the result set might be, and avoid very large result sets which can be inefficient.

The 6.2 release eliminates legacy topic types (RECORD, SINGLE_VALUE and STATELESS) and indeterminate topic ordering. Topics are now stored in path-sorted order within their parents which allows for the definition of a consistent ‘range’ within the topic tree to enable paging.

The old fetch API has now been deprecated and is replaced by the enhanced API.

The examples below show how you can use the new fetch API from within a Java client. The new fetch API will also be available in all other client API variants.

A simple fetch request

As in the previous API, the fetch operation is primarily constrained by specifying a topic selector. A caller may only retrieve topics for which they have ‘select’ permission to the path prefix of the specified selector, and will only receive results for topics within the selection for which they have ‘read’ permission.

What is new is that the fetch request is now defined using a fluent ‘fetch request’ builder. An instance of such a builder is obtained using the fetchRequest method on the Topics feature.

    Topics topics = session.feature(Topics.class);
    FetchRequest request = topics.fetchRequest();

This creates a new instance of the simplest type of request, which will return the path and topic type of each topic selected. You can use methods on this request to create new request instances with additional constraints. A request instance is immutable and can be reused.

The request is issued to the server using the fetch method, which specifies the selection of topics required. This is an asynchronous operation and so the result is returned via a CompletableFuture. The following examples show fetch calls that will block for five seconds awaiting the asynchronous result.

    FetchResult<Void> result = request.fetch("*Accounts/").get(5, SECONDS);

This would return a FetchResult object containing a list of all topics below the “Accounts” topic that the caller has access to read.

You can get the result set from the result object as a list of TopicResult objects, like this:

    List<TopicResult<Void>> results = result.results();

Each result provides the topic path and type. So to print these:

    results.forEach(t -> {
        System.out.println(t.type() + " : " + t.path());
    });

In the above case, note that the result is typed to ‘Void’. This is because we have not requested topic values, and therefore have supplied no data type information.

Fetching topic values

In order to retrieve the topic values, the request needs to be specialized to return values of a given type. For example:

    FetchResult<String> result = 
        topics.fetchRequest()
            .withValues(String.class)
            .fetch("*Accounts/").get(5, SECONDS);

This will return only those topics with values that can be read by the String data type and the result will be typed to String:

    List<TopicResult<String>> results = result.results();
    results.forEach(t -> {
        final String value = t.value(); // value typed to String
        System.out.println(t.path() + " = " + value );
    }

It may be that you do not know the value type of the topics you wish to return, but still wish to return values. We will explain how to handle that situation later.

Fetching a single topic’s value

Given that the fetch request specifies a topic selector, it can be used to return a result for just a single topic. So, for example, if the topic type is known:

    TopicResult<String> result = 
        topics.fetchRequest()
            .withValues(String.class)
            .fetch("MyTopic")
            .get(5, SECONDS)
            .results()
            .stream()
            .findFirst()
            .orElse(null);

The above statement returns a result for “MyTopic“, or null if the topic cannot be found (or the caller did not have permissions to retrieve it). You can get the value from the typed result as follows:

    String value = result.value();

Fetching topic properties

You can also return the topic properties for each topic selected. For example:

    FetchResult<Void> result = 
        topics.fetchRequest()
            .withProperties()
            .fetch("*Accounts/").get(5, SECONDS);

In this case, you can obtain properties from the individual topic results using the specification() method:

    TopicResult<Void> topicResult = result.results().get(0);
    Map<String, String> properties = topicResult.specification().getProperties();

Filtering by topic type

The results can also be restricted to topics of a particular topic type or types:

    FetchResult<Void> result = 
        topics.fetchRequest()
            .topicTypes(EnumSet.of(TopicType.STRING, TopicType.INT64))
            .fetch("*Accounts/").get(5, SECONDS);

When values are requested, the topic types are derived from the value type specified. However, you can type the results to a particular value type and also constrain the results to particular topic types.

One situation when this is useful is when the value type covers more than one possible topic type. For example, the JSON value type includes not only JSON topics but also the primitive STRING, INT64 and DOUBLE types. Therefore, to obtain values only for JSON topics you would need to constrain the results as follows:

    FetchResult<JSON> result = 
        topics.fetchRequest()
            .withValues(JSON.class)
            .topicTypes(EnumSet.of(TopicType.JSON))
            .fetch(“*Accounts/”).get(5, SECONDS);

Requesting values for all types

Because Object.class is the supertype of all topic values, you can request values for all topic types as follows:

    FetchResult<Object> result = 
        topics.fetchRequest()
            .withValues(Object.class)
            .fetch("*Accounts/").get(5, SECONDS);

The type of the result values can then be checked using instanceof :

    TopicResult<Object> topicResult = result.results().get(0);
    if (topicResult.value() instanceof JSON) {
        // handle JSON result
    }

Specifying Bytes.class would also return all types:

    FetchResult<Bytes> result = 
        topics.fetchRequest()
            .withValues(Bytes.class)
            .fetch("*Accounts/").get(5, SECONDS);

However, in this case, the returned value needs to be decoded from its bytes value using the appropriate data type for the topic type.

    result.results().forEach(t -> {
        DataType<?> dataType = Diffusion.dataTypes().getByName(t.type().toString().toLowerCase());
        System.out.println(dataType.readValue(t.value()).toString());
    }

Restricting the results to a range of topics

A powerful feature of the new fetch API is the ability to restrict the results within a specified range of topics. For example:

    FetchResult<Bytes> result = 
        topics.fetchRequest()
            .withValues(Bytes.class)
            .from("Accounts/Dept05")
            .to{"Accounts/Dept10")
            .fetch("*Accounts/").get(5, SECONDS);

This will return results for all topics under “Accounts” from “Accounts/Dept05” to “Accounts/Dept10” inclusive. This means that all topics within the selection that have a path that is lexically within the specified range would be returned. This would include topics at all levels. As well as “Accounts/Dept05” it would also return “Accounts/Dept05/Details” and so forth.

It is important to note that the start and end points in the range are logical positions within a sorted tree, and therefore do not need to represent topics that actually exist. You do not need to specify both start and end points. For example, you can specify a start point but no end point, in which case results up to the logical end of the topic tree would be returned.

It is also possible to specify the range as non-inclusive (using after and before methods), which is more useful for paging through ranges of topics (see below).

Limiting the number of results

In all of the above examples, the result set will include all topics that match the specified selector, filtered by any value or topic type constraints. However, large result sets can be inefficient, and so the maximum number of results returned can be limited as follows:-

    FetchResult<String> result = 
        topics.fetchRequest()
            .withValues(String.class)
            .first(20)
            .fetch("*Accounts/").get(5, SECONDS);

In this case, the result set is limited to returning up to a maximum of 20 results. If there are potentially more results, then the FetchResult object will indicate this via the hasMore method and paging (see below) can be used to request the next block of results.

It is also possible to limit the size of the result set by specifying the maximum size of the result in bytes:-

    FetchResult<String> result = 
        topics.fetchRequest()
            .withValues(String.class)
            .maximumResultSize(2048)
            .fetch("*Accounts/").get(5, SECONDS);

In this case, the result set is limited to the maximum number of topic results that can be accommodated in 2048 bytes. Note that this is not just the data size but the total size of the response. By default, this limit is the same as the maximum message size set via the session factory. As in the above case, the hasMore method will indicate whether all possible results were returned.

Paging through topics

By using a combination of ranges and limits as discussed above, it is possible to page through topics in manageable chunks. The following example shows this being done for all String topics within the whole topic tree. However, in practice, you will probably constrain this to a smaller selection.

    FetchRequest request = 
        topics.fetchRequest()
            .withValues(String.class)
            .first(20);
    FetchResult<String> result = request.fetch("*.*").get(5, SECONDS);

This will return results for the first 20 topics (as a maximum). If there are more, we can fetch the next 20 topics like this:

    if (result.hasMore()) {
        List<TopicResult<String>> results = result.results();
        result = request.after(results.get(results.size() - 1).path()).fetch("*.*");
    }

Because the request is built upon the previous one, it retains the withValues and first limit constraints. So this will return the next 20 results which are ‘after’ the last result of the previous fetch. This can be done repeatedly to return topics in lists of size 20 until hasMore indicates that there are no more.

Reverse paging is also possible. So for the last result set returned above, you can go backwards as follows:

    request = request.last(20); // reverses the order
    result = request.before(result.results.get(0).path()).fetch("*.*");

As before, this can be called repeatedly until the result indicates that there are no more. Note that the last(20) method indicates that the last 20 topics in the selection are required, but they will be returned in ascending (forwards) sorted order.

This forward and reverse paging capability is very useful for client applications that need to present results from a large set within a limited window in a user interface.

Summary

In conclusion, the new fetch API provides a more usable and powerful interface, not only for retrieving topic values but also for retrieving the specification of each topic. The new range capabilities make it easier to develop applications that need to present the user with a view into a large dataset.


The Diffusion Intelligent Data Platform manages, optimizes, and integrates data among devices, systems, and applications. Push Technology pioneered and is the sole provider of real-time delta-data streaming™ technology that powers mission-critical business applications worldwide. Leading brands use Push Technology to fuel revenue growth, customer engagement, and business operations. The products, Diffusion® and Diffusion Cloud™, are available on-premise, in-the-cloud, or in a hybrid configuration, to fit the specific business and infrastructure requirements of the applications operating in today’s mobile obsessed, everything connected world. Learn how Push Technology can reduce infrastructure costs, and increase speed, efficiency, and reliability, of your web, mobile, and IoT application.

LEARN MORE: Case Studies and Developer Resources