Background Image

Play Back Historical Data with Time Series Topics

11 Jan 18

Introduction

Companies worldwide choose and use Diffusion because it is an intelligent, reliable, highly-scalable, real-time application data delivery platform, perfect for handling time-sensitive or mission-critical data via web, mobile, and satellite networks.  While real-time data delivery is crucial for applications like e-gaming, stock trading, and IoT device monitoring, past event data is also valuable.

With the Diffusion 6.0 time series topic functionality, a log of past updates to a topic can be maintained and queried based on a view range of the events retained in the topic. This function allows a session to connect to a Diffusion server and immediately start consuming the latest data, while also gathering all historical data, with no need to cache data on the client side.

The following example demonstrates the capabilities of time series topics in a real world scenario: visualization of real-time motor racing with an instant replay feature.

Telemetry data from the race cars, a real-time map of the race track, and a constantly up-to-date leaderboard feed the real-time display using Diffusion’s time series topics.

Tech Stack

In this example, we use a Java control client to create the topic tree and publish telemetry data pulled from a simulation of a race, with an Angular 1.6 front end to display the data to the end user.

Backend

The Java application is built with Maven to pull in all dependencies, including Diffusion 6.0.2. To include the Diffusion Java library in our pom, we add:

<repositories>
    <repository>
        <id>push-repository</id>
        <url>https://download.pushtechnology.com/maven/</url>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>com.pushtechnology.diffusion</groupId>
        <artifactId>diffusion-client</artifactId>
        <version>6.0.2</version>
    </dependency>
</dependencies>

We also include Spark to provide a simple webserver, which we are using to serve the frontend files.

Frontend

The Angular application is built using npm for dependency management, and packaged with browserify to create a browser friendly bundle from our code. To make Diffusion available across our Angular modules, first we install version 6.0.2 with npm and save it to our package.json:

npm install --save [email protected]

Then we wrap it in an Angular service, making available the Diffusion features we use, like this:

var app = require('angular').module('racing');
var diffusion = require('diffusion');

app.factory('Diffusion', ['$state', function($state) {
    var Diffusion = {
        _session : null
    };

    Diffusion.connect = function(url, done) {
        diffusion.connect(url).then(function(session) {
            Diffusion._session = session;
            done();
        }, function(error) {
            // ...
        });
    };

    Diffusion.session = function() {
        if (!this._session || !this._session.isConnected()) {
            $state.go('connecting');
            return false;
        }
        return this._session;
    };

    Diffusion.datatypes = diffusion.datatypes;

    return Diffusion;
}]);

We also use UI-Router to manage the state of the application. For this demo, we use it to ensure we have a connection to Diffusion, by redirecting to the ‘connecting’ state if the session is accessed but does not exist or is not connected. We also expose datatypes to allow us to construct typed streams for our subscriptions.

When we need to use Diffusion throughout our Angular application now, we simply include it like any other Angular service:

app.controller('MyController', ['Diffusion', function(Diffusion) {
    if (Diffusion.session()) {
        Diffusion.session().subscribe('cool/topic');
    }
}]);

Finally, we browserify the whole thing into a single file and include this in our HTML document.

Updating Time Series Topics with Java

We created a single time series topic for our demo to which we published the telemetry data. We appended new data at a fixed update frequency (in this demo 50 ms) and configured the time series topic to retain the last 10 minutes worth of events. Each event in the time series topic contains a JSON document, using the Diffusion JSON data type, and the telemetry data is simply a single JSON array with each element representing data for a car in the race.

The position, along with other telemetry data, is updated for every car in real-time

To create the time series topic, we must first configure it using the topic specification properties TIME_SERIES_EVENT_VALUE_TYPE and TIME_SERIES_RETAINED_RANGE:

final TopicSpecification specification = topicControl.newSpecification(TopicType.TIME_SERIES)
    .withProperty(TopicSpecification.TIME_SERIES_EVENT_VALUE_TYPE, JSON_DATATYPE_NAME)
    .withProperty(TopicSpecification.TIME_SERIES_RETAINED_RANGE, "last 600s");

We can then create the time series topic using the new async API and the aforementioned topic specification:

topicControl.addTopic("race/updates", specification)
    .thenAccept(/* Here we can start pushing data to the topic */);

In our update loop, we create the necessary JSON values and append them to the end of the time series topic as a new event. This is done using the TimeSeries feature that can be obtained like so:

final TimeSeries timeSeries = session.feature(TimeSeries.class);

Appending an event to the end of the time series topic is as simple as calling append:

final JSON jsonValue = Diffusion.dataTypes().json()
    .fromJsonString("... json formatted telemetry data ...");

timeSeries.append("race/updates", JSON.class, jsonValue);

That’s all the magic we need. Our client is then able to query the time series topic for past events, which we’ll explain next.

Querying Time Series Topics with Javascript

Subscribing to the latest data

Subscribing to a time series topic is not much different to subscribing to any other topic using a typed value stream, with the exception that the value is wrapped in a time series event. Accessing the value is done by calling the ‘get’ method on the value property from the event:

Diffusion.session().stream('race/updates').asType(Diffusion.datatypes.json())
    .on('value', function(topic, spec, event) {
        var val = event.value.get();
        // ... update view logic with the latest snapshot
    });
Diffusion.session().subscribe('race/updates');

Here we are attaching a JSON stream to the telemetry time series topic and subscribing, then pulling the value from the event and updating the view logic with the latest data. This is all we need to do in order to get our ‘live playback’ feature up and running.

Each car has a lap count and timer, which allows for a real-time updating leaderboard to be shown

If we want to peek back at events that have already occurred, we need to use the range query API.

Getting a view on historical data

For this demonstration, we configured our time series topic to retain the last 10 minutes worth of events, but the amount of data retained can easily be configured to suit the application.

The scrubber at the bottom of the page allows for seeking through the available timeline to view past events

The first thing we need to know in order to display a time scrubber in our UI is the time range available. Obviously the most recent value is ‘now’, so we can use the timestamp of the latest event coming through on our value stream to set the current time. We also need to know how far back we can scroll the scrubber, so we must find out the timestamp of the earliest event retained in our Time Series Topic. To do this we can use a range query, built with a view originating at the start, and selecting the next event:

Diffusion.session().timeseries.rangeQuery()
    .fromStart()
    .next(1)
    .as(Diffusion.datatypes.json())
    .selectFrom('race/updates').then(function(result) {
        var startTime = result.events[0].timestamp;
        // ...
    }, function(error) {
        // ...
    });

This lets us set the earliest time to scroll to in the UI, and now we have a time range from start to finish. We use these values as the lower and upper bounds on a slider provided by angularjs-slider, and update the values with a scheduled range query. The tick size on the slider can be set to limit the granularity of the scrubber – we set it to a value equivalent to the update period, around 50 ms.

The scrubber can be dragged along the timeline, which triggers range query requests on the time series topic

As the scrubber is moved along the time axis, we watch for the value of the slider to change, and we use this value to send a range query for the state at the time we have scrubbed to:

Diffusion.session().timeseries.rangeQuery()
    .from(new Date(ClockModel.getViewTime()))
    .next(1)
    .as(Diffusion.datatypes.json())
    .selectFrom('race/updates').then(function(result) {
        var val = result.events[0].value.get();
        // ...
    }, function(err) {
        // ...
    });

This time we grab the current value of the scrubber from our ClockModel to build a range query using the ‘from’ method to select events starting from the Date object we provide.

Now we have built a scrubber that lets us seek to a specific time on the available timeline, but we’re missing one key part – playback. In this demo we’ve gone with a simple implementation: incrementing the time on the scrubber and performing a range query for each frame in the animation. A more sophisticated way to go about this would be to implement a form of buffering, where we grab chunks of data with a range query and iterate through the events based on the timestamp, gathering another chunk of data as we make progress through the existing events. This would reduce the need to make a range query for every frame (in this case every 50 ms), and would allow for smoother playback on poor networks.

In terms of the range query, this could be achieved by replacing

.next(1)

with

.nextMillis(5000)

to get 5 seconds worth of data, for example. The view logic would also need modifying to progress through the events at a rate consistent with the timestamps.

Other features

A graph of lap times for a specific driver is populated using request-response messaging

As well as the time series topic data that forms the key of the demo, we also make use of another new feature – request-response messaging – in order to pull the lap times for a selected driver on request and display them in a graph in the UI. This makes up only a small part of this demo, so we won’t cover the details in this post. We’ll post a more in-depth demonstration of request-response messaging soon.

Source code

All of the source code for this demo can be found on github.

Further reading

Designing your application with time series topics

Time series topics developer examples

Diffusion Java API documentation

Diffusion JavaScript API documentation


The Diffusion Intelligent Data Platform manages, optimizes, and integrates data among devices, systems, and applications. Push Technology pioneered and is the sole provider of real-time delta-data streaming™ technology that powers mission-critical business applications worldwide. Leading brands use Push Technology to fuel revenue growth, customer engagement, and business operations. The products, Diffusion® and Diffusion Cloud™, are available on-premise, in-the-cloud, or in a hybrid configuration, to fit the specific business and infrastructure requirements of the applications operating in today’s mobile obsessed, everything connected world. Learn how Push Technology can reduce infrastructure costs, and increase speed, efficiency, and reliability, of your web, mobile, and IoT application.

LEARN MORE: Case Studies and Developer Resources