/*
* Subscribe to topics.
*/
DIFFUSION_SESSION_FACTORY_T *session_factory = diffusion_session_factory_init();
diffusion_session_factory_principal(session_factory, "admin");
diffusion_session_factory_password(session_factory, "");
SESSION_T *session = session_create_with_session_factory(session_factory, "ws://::");
static int on_subscription(const char* topic_path,
const TOPIC_SPECIFICATION_T *specification,
void *context)
{
printf("Subscribed to topic: %s\n", topic_path);
return HANDLER_SUCCESS;
}
static int on_unsubscription(const char* topic_path,
const TOPIC_SPECIFICATION_T *specification,
NOTIFY_UNSUBSCRIPTION_REASON_T reason,
void *context)
{
printf("Unsubscribed from topic: %s\n", topic_path);
return HANDLER_SUCCESS;
}
static int on_value(const char* topic_path,
const TOPIC_SPECIFICATION_T *const specification,
const DIFFUSION_DATATYPE datatype,
const DIFFUSION_VALUE_T *const old_value,
const DIFFUSION_VALUE_T *const new_value,
void *context)
{
char *result;
bool success = to_diffusion_json_string(new_value, &result, &api_error);
if(success) {
printf("Received value: %s\n", result);
free(result);
return HANDLER_SUCCESS;
}
DIFFUSION_API_ERROR api_error;
printf("Error during diffusion value read: %s\n", get_diffusion_api_error_description(api_error));
diffusion_api_error_free(api_error);
return HANDLER_FAILURE;
}
static void on_close()
{
printf("Value stream closed\n");
}
VALUE_STREAM_T value_stream = {
.datatype = DATATYPE_JSON,
.on_subscription = on_subscription,
.on_unsubscription = on_unsubscription,
.on_value = on_value,
.on_close = on_close
};
add_stream(session, "my-topic", &value_stream);
SUBSCRIPTION_PARAMS_T params = {
.topic_selector = "my-topic"
};
subscribe(session, params);