Diffusion for Developers

Create real-time applications that deliver hyper-personalized data to millions of concurrent consumers.

Developers hero image

Remove the limits on team productivity – rapidly develop and connect applications with your choice of language, protocols and APIs.

PUBLISH

const session = await diffusion.connect({
    host : "<HOST>",
    port : "<PORT>",
    principal : "<PRINCIPAL>",
    credentials : "<PASSWORD>"
});

await session.topics.add('<TOPIC_PATH>', diffusion.topics.TopicType.JSON);

const value = { foo : "bar" };
await session.topicUpdate.set("<TOPIC_PATH>", diffusion.datatypes.json(), value);
SUBSCRIBE

const session = await diffusion.connect({
    host : "<HOST>",
    port : "<PORT>",
    principal : "<PRINCIPAL>",
    credentials : "<PASSWORD>"
});

session.addStream('<TOPIC_PATH>', diffusion.datatypes.json())
       .on('value', function(topic, specification, newValue, oldValue) {
           console.log('New value:', newValue.get());
       });

await session.select('<TOPIC_PATH>');
PUBLISH

var session = Diffusion.Sessions
    .Principal("<PRINCIPAL>")
    .Password("<PASSWORD>")
    .Open("ws://<HOST>:<PORT>:");

await session.TopicControl.AddTopicAsync("<TOPIC_PATH>", TopicType.JSON);

var jsonDataType = Diffusion.DataTypes.JSON;
var value = jsonDataType.FromJSONString("{\"foo\": \"bar\"}");
_ = session.TopicUpdate.SetAsync("<TOPIC_PATH>", value);
SUBSCRIBE

var session = Diffusion.Sessions
    .Principal("<PRINCIPAL>")
    .Password("<PASSWORD>")
    .Open("ws://<HOST>:<PORT>:");

session.Topics.AddStream("<TOPIC_PATH>", new ExampleValueStream());

_ = await session.Topics.SubscribeAsync("<TOPIC_PATH>");

class ExampleValueStream : IValueStream<IJSON>
{
    public void OnSubscription(string topicPath, ITopicSpecification specification)
    {
        Console.WriteLine($"Subscribed to: {topicPath}");
    }

    public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason)
    {
        Console.WriteLine($"Unsubscribed from: {topicPath}");
    }

    public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue)
    {
        Console.WriteLine($"{topicPath}: {newValue.ToJSONString()}");
    }

    public void OnClose()
    {
        // Not used
    }

    public void OnError(ErrorReason errorReason)
    {
        // Not used
    }
}
PUBLISH

final Session session = Diffusion
    .sessions()
    .principal("<PRINCIPAL>")
    .password("<PASSWORD>")
    .open("ws://<HOST>:<PORT>");

session.feature(TopicControl.class).addTopic("<TOPIC_PATH>", TopicType.JSON);

session.feature(TopicUpdate.class).set("<TOPIC_PATH>", JSON.class,
    Diffusion.dataTypes().json().fromJsonString("{\"hello\": \"world\"}"));
SUBSCRIBE

final Session session = Diffusion
    .sessions()
    .principal("<PRINCIPAL>")
    .password("<PASSWORD>")
    .open("ws://<HOST>:<PORT>");

session.feature(Topics.class)
.addStream("<TOPIC_PATH>", JSON.class, new Topics.ValueStream.Default<JSON>() {
    @Override
    public void onValue(
        String topicPath,
        TopicSpecification specification,
        JSON oldValue,
        JSON newValue) {
        System.out.println("New value for " + topicPath + ": " + newValue.toJsonString());
    }
    @Override
    public void onSubscription(
        String topicPath,
        TopicSpecification specification) {
        System.out.println("on subscription: " + topicPath);
    }
    @Override
    public void onUnsubscription(
        String topicPath,
        TopicSpecification specification,
        UnsubscribeReason reason) {
        System.out.println("on unsubscription: " + topicPath + ": " + reason.toString());
    }
});

session.feature(Topics.class).subscribe("<TOPIC_PATH>");
PUBLISH

let url = URL(string: "ws://<HOST>:<PORT>")!
let credentials = PTDiffusionCredentials(password: "<PASSWORD>")
let config = PTDiffusionSessionConfiguration(principal: "<PRINCIPAL>",
                                             credentials: credentials)

let errorHandler: (Any?, Error?) -> Void = {response, error in
    if (error != nil) {
        print(error!)
    }
}

PTDiffusionSession.open(with: url,
                        configuration: config) { (session, error) in

    if (error != nil) {
        print("An error has occurred: \(error!.localizedDescription)")
        return
    }

    session!.topicControl.addTopic(withPath: "<PATH>",
                                   type: PTDiffusionTopicType.JSON,
                                   completionHandler: errorHandler)

    let value = try! PTDiffusionJSON(object: ["foo": "bar"])

    session!.topicUpdate.setWithPath("<PATH>",
                                     toJSONValue: value) { error in errorHandler(nil, error) }
}
SUBSCRIBE

class StreamDelegate: NSObject, PTDiffusionJSONValueStreamDelegate {

    func diffusionStream(_ stream: PTDiffusionStream,
                         didUnsubscribeFromTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         reason: PTDiffusionTopicUnsubscriptionReason) {
        print("Unsubscribed from: \(topicPath)")
    }

    func diffusionStream(_ stream: PTDiffusionStream,
                         didFailWithError error: Error) {
        print("Failed with error: \(error)")
    }

    func diffusionDidClose(_ stream: PTDiffusionStream) {
        print("Closed")
    }

    func diffusionStream(_ stream: PTDiffusionStream,
                         didSubscribeToTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification) {
        print("Subscribed to: \(topicPath)")
    }

    func diffusionStream(_ stream: PTDiffusionValueStream,
                         didUpdateTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         oldJSON: PTDiffusionJSON?,
                         newJSON: PTDiffusionJSON) {
        do {
            let value:Dictionary<String, Any> = try newJSON.object() as! Dictionary
            print("\(topicPath): \(value.description)")
        }
        catch {
            print("Unable to read message")
        }
    }
}

let url = URL(string: "ws://<HOST>:<PORT>")!
let credentials = PTDiffusionCredentials(password: "<PASSWORD>")
let config = PTDiffusionSessionConfiguration(principal: "<PRINCIPAL>",
                                             credentials: credentials)

let errorHandler: (Any?, Error?) -> Void = {response, error in
    if (error != nil) {
        print(error!)
    }
}

PTDiffusionSession.open(with: url,
                        configuration: config) { (session, error) -> Void in
    if (error != nil) {
        print("An error has occurred: \(error!.localizedDescription)")
        return
    }

    let delegate = StreamDelegate()

    let selector = PTDiffusionTopicSelector(expression: "<PATH>")
    let stream = PTDiffusionJSON.valueStream(with: delegate)

    try! session!.topics.add(stream, with: selector, error: ())

    session!.topics.subscribe(withTopicSelectorExpression: "<PATH>") { error in errorHandler(nil, error) }
}
PUBLISH

// Define the callback functions
static int on_topic_creation_result(
        DIFFUSION_TOPIC_CREATION_RESULT_T result,
        void *context)
{
        // topic has been added
        return HANDLER_SUCCESS;
}

// Create a session factory and set principal and password
DIFFUSION_SESSION_FACTORY_T *session_factory = diffusion_session_factory_init();
diffusion_session_factory_principal(session_factory, "<PRINCIPAL>");
diffusion_session_factory_password(session_factory, "<PASSWORD>");

// Create a new session
SESSION_T *session = session_create_with_session_factory(session_factory, "ws://<HOST>:<PORT>");

BUF_T *buf = buf_create();
write_diffusion_json_value("\"hello world\"", buf);

TOPIC_SPECIFICATION_T *spec = topic_specification_init(TOPIC_TYPE_JSON);
DIFFUSION_TOPIC_UPDATE_ADD_AND_SET_PARAMS_T topic_add_and_set_params = {
        .topic_path = "<PATH>",
        .specification = spec,
        .datatype = DATATYPE_JSON,
        .update = buf,
        .on_topic_update_add_and_set = on_topic_creation_result
};

// Add a topic and set its value
diffusion_topic_update_add_and_set(session, topic_add_and_set_params);
SUBSCRIBE

// Define the callback functions
static int on_subscription(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *specification,
        void *context)
{
        // value stream is now subscribed to `topic_path`
        return HANDLER_SUCCESS;
}

static int on_unsubscription(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *specification,
        NOTIFY_UNSUBSCRIPTION_REASON_T reason,
        void *context)
{
        // value stream is now unsubscribed from `topic_path`
        return HANDLER_SUCCESS;
}

static int on_value(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        const DIFFUSION_DATATYPE datatype,
        const DIFFUSION_VALUE_T *const old_value,
        const DIFFUSION_VALUE_T *const new_value,
        void *context)
{
        // read the value update
        return HANDLER_SUCCESS;
}

static void on_close()
{
        // value stream has been closed
}

// Create a session factory and set principal and password
DIFFUSION_SESSION_FACTORY_T *session_factory = diffusion_session_factory_init();
diffusion_session_factory_principal(session_factory, "<PRINCIPAL>");
diffusion_session_factory_password(session_factory, "<PASSWORD>");

// Create a new session
SESSION_T *session = session_create_with_session_factory(session_factory, "ws://<HOST>:<PORT>");

VALUE_STREAM_T value_stream = {
        .datatype = DATATYPE_JSON,
        .on_subscription = on_subscription,
        .on_unsubscription = on_unsubscription,
        .on_value = on_value,
        .on_close = on_close
};
add_stream(session, "<PATH>", &value_stream);

SUBSCRIPTION_PARAMS_T params = {
        .topic_selector = "<PATH>"
};
subscribe(session, params);
PUBLISH

# Add a topic and set its value.
topic_type = diffusion.datatypes.STRING
value = "Value1"

async def main(
    topic_selector="<TOPIC_PATH>",
    host="<HOST>",
    port="<PORT>",
    principal="<PRINCIPAL>",
    password="<PASSWORD>",
    protocol="wss"
):
    async with diffusion.Session(
        url=f"{protocol}://{host}:{port}",
        principal=f"{principal}",
        credentials=diffusion.Credentials(f"{password}"),
    ) as session:

        add_response = await session.topics.add_topic(topic_selector, topic_type)

        if add_response == session.topics.CREATED:
            print(f"Topic {topic_selector} successfully created.")
        if add_response == session.topics.EXISTS:
            print(f"Topic {topic_selector} already exists.")

        await session.topics.set_topic(topic_selector, value, specification=topic_type)
        print(f"Topic {topic_selector} successfully set to {value}")
SUBSCRIBE

# Subscribe to topics.
def on_update(*, old_value, topic_path, topic_value, **kwargs):
    print("Topic:", topic_path)
    if old_value is None:
        print("  Initial value:", topic_value)
    else:
        print("  Value updated")
        print("    Old value:", old_value)
        print("    New value:", topic_value)

def on_subscribe(*, topic_path, **kwargs):
    print(f"Subscribed to {topic_path}")

def on_unsubscribe(*, reason, topic_path, **kwargs):
    print(f"Unsubscribed from {topic_path} because {str(reason)}")

topic_type = diffusion.datatypes.STRING

session_duration = 15

value_stream = diffusion.features.topics.ValueStreamHandler(
    data_type=topic_type,
    update=on_update,
    subscribe=on_subscribe,
    unsubscribe=on_unsubscribe,
)

async def main(
    topic_selector="<TOPIC_PATH>",
    host="<HOST>",
    port="<PORT>",
    principal="<PRINCIPAL>",
    password="<PASSWORD>",
    protocol="wss"
):
    async with diffusion.Session(
        url=f"{protocol}://{host}:{port}",
        principal=f"{principal}",
        credentials=diffusion.Credentials(f"{password}"),
    ) as session:

    print("Adding value stream")
    session.topics.add_value_stream(
        topic_selector=topic_selector, stream=value_stream
    )

    print(f"Subscribing to {topic_selector}")
    await session.topics.subscribe(topic_selector)
    await asyncio.sleep(session_duration)

    print(f"Unsubscribing from {topic_selector}")
    await session.topics.unsubscribe(topic_selector)

Consume, Transform and Deliver Data with Intelligence and Ease

Consumption icon

Consumption iconCONSUME

Quickly integrate any data source using adapters that simplify connecting data streams Kafka, REST, MQTT and more.

Adapters

  • Cloud REST Adapters poll external REST services and import data to a topic path of your choice.
  • Kafka Adapter lets you easily
    • Integrate real-time data from remote Kafka clusters into Diffusion.
    • Filter and aggregate data with flexible integration.
    • Connect securely using SSL—or SASC Map a Kafka data stream to and from a Diffusion time series, monitoring activity with Prometheus.

Protocols

  • MQTT Support enables direct connection of IoT devices using MQTT 5.0.
  • REST API is ideal for one-off updates or data snapshots, suitable for low-power devices and can be called with any language capable of HTTP requests.

SDKs

  • SDKs stream the latest real-time data from your client app using a persistent session.
Wrangling icon

Wrangling iconTRANSFORM

Diffusion tranforms data in-flight, processes and segments data in real time, stores event streams for querying and editing and streams real-time data from your clients’ apps.

Hyper-Personalize Data

  • Easily transform and map incoming data with low-code topic views—or implement advanced application logic with control clients.
  • Aggregate multiple incoming values into one topic to turn a clickstream into a customer.
  • Expand a single incoming data point and generate subtopics to make a list of prices into markets.
  • Tailor data for delivery to each service or region and even provide a custom feed for each end user.

TIME SERIES

  • Store a time-stamped event within a single topic.
  • Stream events as they happen or query to retrieve part of a series.
  • Enable non-destructive updates to update a posted event while maintaining a full audit trail

Delayed Feed

  • Topic views can be used to create a delayed feed and are ideal for creating lower-value versions of time-sensitive data. You can replay every data change with custom delays of as little as one second or multiple days.
Distribution icon

Distribution iconDELIVER

Diffusion features patented delta streaming technology that minimizes costs, controls data access down to individual end users to maximize security, and uses remote topic views to efficiently distribute data across a geographically dispersed user base.

Delta Streaming

  • Intelligently distinguishes between old, updated and new data, only sending recent, relevant, information to clients instead of the entire topic content. 90% reduction in server and bandwidth requirements is achieved by avoiding the need to send data that isn’t changing from one markup to the next.

Fine-Grained Security

  • Fine-grained dynamic security permissions can be provided for each topic.
  • Unique security permissions can be assigned to each user, scalable to hundreds of thousands of users.
  • Access changes are applied immediately giving you real-time control of what data each user can see.

Remote Topic Views

  • Copy all or part of the topic tree on one server to another.
  • Ideal for distributing data across a geographically dispersed user base.
  • Minimize latency and bandwidth by serving data from a local server.
  • Easy set up and modification from the Diffusion monitoring console or via SDKs.

Diffusion SDKs

Available for immediate download. Need help or direction? Book a call with one of our architects.

Learn About the Diffusion Intelligent Data Platform

Whether you’re using Diffusion now or just getting your feet wet, our learning resources are available to help anytime.

Support

Need a hand or a few pointers? Contact our support team.

Start publishing real-time data with Diffusionfree!