Publish
session.topics.add('my-topic',diffusion.topics.TopicType.JSON);
var value = { foo : 'bar' };
var result = session.topicUpdate.set('my-topic', diffusion.datatypes.json(), value);
await session.TopicControl.AddTopicAsync( “my-topic”, TopicType.JSON );
var jsonDataType = Diffusion.DataTypes.JSON;
var value = jsonDataType.FromJSONString("{"foo": "bar"}");
var result = session.TopicUpdate.SetAsync(
"my-topic",
value);
session.feature(TopicControl.class).addTopic(
"my-topic",
TopicType.JSON,
new TopicControl.AddCallback.Default()); final JSONDataType jsonDataType = Diffusion.dataTypes()
.json();
final JSON value = jsonDataType.fromJsonString(
"{"foo" : "bar"}"); final CompletableFuture result = session.feature(TopicUpdate.class).set(
"my-topic", JSON.class, value);
self.session!.topicControl.add(
withTopicPath: "my-topic",
type: PTDiffusionTopicType.JSON,
value: nil,
completionHandler: self.errorHandler)
Now that the topic exists, you can publish data to it. Add the following lines:
var error: NSError? = nil
let value = PTDiffusionJSON(object: ["foo" : "bar" ], error: &error)
self.session!.topicUpdateControl.updater.update(
withTopicPath: "my-topic",
value: value,
completionHandler: self.errorHandler)
session.feature(TopicControl.class).addTopic("my-topic", TopicType.JSON);
final JSONDataType jsonDataType = Diffusion.dataTypes().json(); final JSON value = jsonDataType
.fromJsonString("{"foo" : "bar"}"); final CompletableFuture result = session.feature(
opicUpdate.class).set("my-topic", JSON.class, value)
char *topic_path = “my-topic”;
BUF_T *buf = buf_create();
write_diffusion_json_value("{"foo" : "bar"}", buf);
DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T topic_update_params = {
.topic_path = topic_path,
.datatype = DATATYPE_JSON,
.update = buf
};
diffusion_topic_update_set(session, topic_update_params);
requests.post('https://api.diffusion.cloud/topics/add', headers={ 'Authorization':
'Bearer ' + token }, json = { 'path': 'sensors/1032952' })
requests.post('https://api.diffusion.cloud/topics/set', headers={ 'Authorization':
'Bearer ' + token }, json = {
'value': { 'latitude': 44.934, 'longitude': -93.262, "timestamp": 15903912 },
'path': 'sensors/1032952'
})
$client = new \GuzzleHttp\Client(["base_uri" => "https://api.diffusion.cloud"]);
$client->post("/topics/add", [
'headers' => ['Authorization' => 'Bearer '],
'json' => [ 'path' => 'sensors/1032952']
]);
$client = new \GuzzleHttp\Client(["base_uri" => "https://api.diffusion.cloud"]);
$client->post("/topics/set", [
'headers' => ['Authorization' => 'Bearer '],
'json' => [
'value' => ['latitude' => 44.934, 'longitude' => -93.262, "timestamp" => 15903912
]
'path' => 'sensors/1032952'
]
]);
req, _ := http.NewRequest("POST", "https://api.diffusion.cloud/topics/add",
strings.NewReader("{\”path\": \"sensors/1032952\"}"))
req.Header.Add("Authorization", "Bearer ")
req.Header.Add("content-type", "application/json")
client := &http.Client {}
res, err := client.Do(req)
value = strings.NewReader("{\”path\": \"sensors/1032952\", \"value\": {\"latitude\":
44.934, \"longitude\": -93.262, \"timestamp\": 15903912 } }")
req, _ := http.NewRequest("POST", "https://api.diffusion.cloud/topics/set", value)
req.Header.Add("Authorization", "Bearer ")
req.Header.Add("content-type", "application/json")
client := &http.Client {}
res, err := client.Do(req)
curl --include
--request POST
--url https://api.diffusion.cloud/topics/add
--header 'authorization: Bearer '
--header 'content-type: application/json'
-d '{"path": "sensors/1032952" }'
curl --include
--request POST
--url https://api.diffusion.cloud/topics/set
--header 'authorization: Bearer '
--header 'content-type: application/json'
-d '{"path": "sensors/1032952", "value": { "latitude": 44.934, "longitude": -
93.262, "timestamp": 15903912 }'
Request
session.messages.sendRequestToFilter('$Principal is "alice"', 'account/offers', 'Special offer!', {
onResponse : function(sessionID, response) {
console.log("Received response " + response);
},
onResponseError : function() {
console.log("There was an error when receiving the response");
},
...
});
CancellationToken cancellationToken;
await session.Messaging.SendRequestToFilterAsync(
"$Principal EQ 'alice'",
"account/offers",
"Special offer!",
new RequestCallback(),
cancellationToken );
private class RequestCallback : IFilteredRequestCallback {
public void OnResponse( ISessionId sessionId, string response )
=> WriteLine( $"Received response: '{response}'." );
}
session.feature(MessagingControl.class).sendRequestToFilter(
'$Principal is "alice"',
'account/offers',
'Special offer!',
String.class,
String.class,
new FilteredRequestCallback.Default() {
@Override
public void onResponse(SessionId sessionId, String response) {
System.out.println("Recieved response " + response);
}
});
NSError *error;
PTDiffusionRequest *const request =
[PTDiffusionPrimitive requestWithString:@"Special offer!" error:&error];
PTDiffusionSessionResponseStream *const responseStream =
[PTDiffusionPrimitive stringSessionResponseStreamWithDelegate:self];
[session.messaging sendRequest:request
toFilter:@"$Principal is 'alice'"
path:@"account/offers"
responseStream:responseStream
completionHandler:^(NSUInteger count, NSError * _Nullable error)
{
NSLog(@"Received response from %lu total sessions", count);
}];
#pragma mark - PTDiffusionStringSessionResponseStreamDelegate methods
-(void) diffusionStream:(PTDiffusionStream *)stream
didReceiveResponseWithString:(nullable NSString *)string
fromSessionId:(PTDiffusionSessionId *)sessionId {
NSLog(@"Stream [%@] received response with string [%@] from session [%@]", stream, string, sessionId);
}
session.feature(MessagingControl.class).sendRequestToFilter(
'$Principal is "alice"',
'account/offers',
'Special offer!',
String.class,
String.class,
new FilteredRequestCallback.Default() {
@Override
public void onResponse(SessionId sessionId, String response) {
System.out.println("Recieved response " + response);
}
});
Publisher
session.topics.add('topic/timeseries', new TopicSpecification(TopicType.TIME_SERIES, {
TIME_SERIES_EVENT_VALUE_TYPE : "json",
TIME_SERIES_RETAINED_RANGE: "last 5h limit 100"
}))
for (let i = 0; i < 100; i++ ) {
session.timeseries.append("topic/timeseries", { value: i }, diffusion.datatypes.json());
}
session.TopicControl.AddTopicAsync('topic/timeseries', session.TopicControl.NewSpecification( TopicType.JSON )
.WithProperty(TopicSpecification.TIME_SERIES_EVENT_VALUE_TYPE, "json")
.WithProperty(TopicSpecification.TIME_SERIES_RETAINED_RANGE, "last 5h limit 100"));
for (var i = 0; i < 100; i++ ) {
var value = Diffusion.DataTypes.JSON.FromJSONString("{\"value\" : " + i + "}");
session.TimeSeries.AppendAsync("topic/timeseries", value);
}
session.feature(TopicControl.class).add('topic/timeseries', Diffusion.newTopicSpecification(TopicType.TIME_SERIES)
.withProperty(TopicSpecification.TIME_SERIES_EVENT_VALUE_TYPE, "json")
.withProperty(TopicSpecification.TIME_SERIES_RETAINED_RANGE, "last 5h limit 100"));
final JSONDataType jsonDataType = Diffusion.dataTypes().json();
for (int i = 0; i < 100; i++ ) {
JSON value = jsonDataType.fromJsonString("{\"value\" : " + i + "}");
session.feature(TimeSeries.class).append("topic/timeseries", value, diffusion.datatypes.json());
}
PTDiffusionTopicSpecification *const specification =
[[PTDiffusionTopicSpecification alloc] initWithType:PTDiffusionTopicType_TimeSeries
properties:@{PTDiffusionTopicSpecification.timeSeriesEventValueTypePropertyKey: PTDiffusionDataTypes.json.typeName,
PTDiffusionTopicSpecification.timeSeriesRetainedRangePropertyKey: @"last 5h limit 100"}];
[session.topicControl addTopicWithPath:@"topic/timeseries"
specification:specification
completionHandler:^(PTDiffusionAddTopicResult * _Nullable result, NSError * _Nullable error)
{
if ([result isEqual:PTDiffusionAddTopicResult.exists]) {
NSLog(@"Topic already existed");
}
else if ([result isEqual:PTDiffusionAddTopicResult.created]) {
NSLog(@"Topic created");
}
else if (error != nil) {
NSLog(@"An error occurred while attempting to create topic: [%@]", error);
return;
}
NSError *jsonError;
for (int i = 0; i < 100; i++) {
PTDiffusionJSON *const value =
[[PTDiffusionJSON alloc] initWithJSONString:[NSString stringWithFormat:@"{\"value\" : %d}", i]
error:&jsonError];
[session.timeSeries appendToTopicPath:@"topic/timeseries"
JSONValue:value
completionHandler:^(PTDiffusionTimeSeriesEventMetadata * _Nullable eventMetadata, NSError * _Nullable error)
{
if (eventMetadata == nil && error != nil) {
NSLog(@"An error occured while attempting to append to topic path: [%@]", error);
}
}];
}
}];
session.feature(TopicControl.class).add('topic/timeseries', Diffusion.newTopicSpecification(TopicType.TIME_SERIES)
.withProperty(TopicSpecification.TIME_SERIES_EVENT_VALUE_TYPE, "json")
.withProperty(TopicSpecification.TIME_SERIES_RETAINED_RANGE, "last 5h limit 100"));
final JSONDataType jsonDataType = Diffusion.dataTypes().json();
for (int i = 0; i < 100; i++ ) {
JSON value = jsonDataType.fromJsonString("{\"value\" : " + i + "}");
session.feature(TimeSeries.class).append("topic/timeseries", value, diffusion.datatypes.json());
}
Response
session.messages.setRequestStream('account/offers', {
onRequest: function(path, request, responder) {
console.log('Received request: ' + request);
responder.respond('Tell me more');
},
...
});
session.Messaging.SetRequestStream('account/offers', new MessageRequestStream() );
private class MessageRequestStream : IRequestStream {
public void OnRequest( string path, string request, IResponder responder ) {
WriteLine( $"Received request: '{request}'." );
responder.Respond( "Tell me more" );
}
}
session.feature(Messaging.class).sendRequestToFilter(
'account/offers',
String.class,
String.class,
new RequestStream.Default() {
@Override
public void onRequest(String path, String request, Messaging.RequestStream.Responder responder) {
System.out.println("Received request: " + request);
responder.respond("Tell me more");
}
});
PTDiffusionRequestHandler *const handler = [PTDiffusionPrimitive stringRequestHandlerWithDelegate:self];
[session.messaging addRequestHandler:handler
forPath:@"account/offers"
completionHandler:^(PTDiffusionTopicTreeRegistration * _Nullable registration, NSError * _Nullable error)
{ }];
#pragma mark - PTDiffusionStringRequestDelegate
- (void)diffusionTopicTreeRegistration:(nonnull PTDiffusionTopicTreeRegistration *)registration
didReceiveRequestWithString:(nullable NSString *)string
context:(nonnull PTDiffusionRequestContext *)context
responder:(nonnull PTDiffusionResponder *)responder {
NSLog(@"Registration [%@] received request with string [%@]", registration, string);
NSError *error;
[responder respondWithString:@"Tell me more" error:&error];
}
session.feature(Messaging.class).sendRequestToFilter(
'account/offers',
String.class,
String.class,
new RequestStream.Default() {
@Override
public void onRequest(String path, String request, Messaging.RequestStream.Responder responder) {
System.out.println("Received request: " + request);
responder.respond("Tell me more");
}
});
Subscribe
session.addStream('my-topic', diffusion.datatypes.json()).on('value', function(topic, specification, newValue, oldValue) {
  console.log(`New value for ${path}: ${newValue.get()}`);
session.select('my-topic');
});
class ExampleValueStream : IValueStream
{
public void OnSubscription(string topicPath, ITopicSpecification specification)
{
Console.WriteLine($"Subscribed to: {topicPath}");
}
public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason)
{
Console.WriteLine($"Unsubscribed from: {topicPath}");
}
public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue)
{
Console.WriteLine($"{topicPath}: {newValue.ToJSONString()}");
}
public void OnClose()
{
// Not used
}
public void OnError(ErrorReason errorReason)
{
// Not used
}
}
session.Topics.AddStream("my-topic", new ExampleValueStream());
await session.GetTopicsFeature().SubscribeAsync("my-topic");
session.feature(Topics.class).addStream("my-counter", JSON.class, new Topics.ValueStream.Default <
JSON > () {
@Override
public void onValue(String topicPath, TopicSpecification topicSpec, JSON oldValue,
JSON newValue) {
System.out.println("New value for" + topicPath + ": " + newValue.toJsonString());
}
});
session.feature(Topics.class).subscribe("my-counter");
class StreamDelegate: NSObject, PTDiffusionJSONValueStreamDelegate {
func diffusionStream(
_ stream: PTDiffusionStream,
didSubscribeToTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification) {
print("Subscribed to: \(topicPath)")
}
func diffusionStream(
_ stream: PTDiffusionValueStream,
didUpdateTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification,
oldJSON: PTDiffusionJSON?,
newJSON: PTDiffusionJSON) {
do {
let value:Dictionary = try newJSON.object() as! Dictionary
print("\(topicPath): \(value.description)")
} catch {
print("Unable to read message")
}
}
}
session.feature(Topics.class).addStream("my-counter", JSON.class, new Topics.ValueStream.Default <
JSON > () {
@Override
public void onValue(String topicPath, TopicSpecification topicSpec, JSON oldValue,
JSON newValue) {
System.out.println("New value for" + topicPath + ": " + newValue.toJsonString());
}
});
session.feature(Topics.class).subscribe("my-counter");
static int on_value(const char *const topic_path,
const TOPIC_SPECIFICATION_T *const specification,
DIFFUSION_DATATYPE datatype,
const DIFFUSION_VALUE_T *const old_value,
const DIFFUSION_VALUE_T *const new_value,
void *context)
{
char *result;
to_diffusion_json_string(new_value, &result, NULL);
printf(“New value for %s: %s \n”, topic_path, result);
free(result);
return HANDLER_SUCCESS;
}
char *topic_selector = “>my-topic”;
VALUE_STREAM_T value_stream = {
.datatype = DATATYPE_JSON,
.on_value = on_value
};
add_stream(session, topic_selector, &value_stream);
SUBSCRIPTION_PARAMS_T params = {
.topic_selector = topic_selector
};
subscribe(session, params);
var response = requests.post('https://api.diffusion.cloud/topics/fetch', headers={
'Authorization': 'Bearer ' + token }, json = { selector: '?sensors/.*' })
print (response.json()[‘results])
$client = new \GuzzleHttp\Client(["base_uri" => "https://api.diffusion.cloud"]);
$response = $client->post("/topics/fetch", [
'headers' => ['Authorization' => 'Bearer '],
'json' => [ 'selector' => '?sensors/.*']
]);
echo json_decode($response->getBody())[‘results’];
req, _ := http.NewRequest("POST", "https://api.diffusion.cloud/topics/fetch",
strings.NewReader("{\selector\": \"?sensors/.*\"}"))
req.Header.Add("Authorization", "Bearer ")
req.Header.Add("content-type", "application/json")
client := &http.Client {}
res, err := client.Do(req)
curl --include
--request POST
--url https://api.diffusion.cloud/topics/fetch
--header 'authorization: Bearer '
--header 'content-type: application/json'
-d '{"selector": "?sensors/.*" }'
Subscriber
const results = await session.timeseries.rangeQuery().as(diffusion.datatypes.json()).fromStart().limit(10).selectFrom("topic/timeseries");
result.events.forEach(event => {
console.log("Got historic event: ", JSON.stringify(event));
});
var result = await session.TimeSeries.RangeQuery.As(IJSON).FromStart().Limit(10).SelectFromAsync("topic/timeseries");
foreach (var event in result.Events) {
Console.WriteLine($"Got historic event: {event}");
}
session.feature(TimeSeries.class).rangeQuery().as(JSON.class).fromStart().limit(10).selectFrom("topic/timeseries").thenAccept(result -> {
result.stream().forEach(event -> {
System.out.println("Got historic event");
System.out.println(event);
});
});
PTDiffusionTimeSeriesRangeQuery *const query = [[[PTDiffusionTimeSeriesRangeQuery new] fromStart] limitWithCount:10];
[session.timeSeries evaluateQuery:query
atTopicPath:@"topic/timeseries"
JSONCompletionHandler:^(PTDiffusionJSONTimeSeriesQueryResult * _Nullable result, NSError * _Nullable error)
{
if (result != nil && error == nil) {
NSLog(@"Historic value: [%@]", result);
}
else {
NSLog(@"An error occurred while attempting to retrieve the historic value of the time series topic: [%@]", error);
}
}];
session.feature(TimeSeries.class).rangeQuery().as(JSON.class).fromStart().limit(10).selectFrom("topic/timeseries").thenAccept(result -> {
result.stream().forEach(event -> {
System.out.println("Got historic event");
System.out.println(event);
});
});