IBM Streams Event Streams integration

Overview

IBM® Event Streams is a fully managed, cloud-based messaging service. It is built on Apache Kafka and is available through IBM Cloud® as a Service.

This module allows a Streams application to subscribe a message queue as a stream and publish messages on a queue from a stream of tuples.

Credentials

Event Streams credentials are defined using a Streams application configuration or setting the Event Streams service credentials JSON directly to the credentials parameter of the functions.

By default an application configuration named messagehub is used, a different configuration can be specified using the credentials parameter to subscribe() or publish().

The application configuration must contain the property messagehub.creds with a value of the raw Event Streams service credentials JSON.

Messages

The schema of the stream defines how messages are handled.

  • CommonSchema.String - Each message is a UTF-8 encoded string.
  • CommonSchema.Json - Each message is a UTF-8 encoded serialized JSON object.

No other formats are supported.

Sample

A simple hello world example of a Streams application publishing to a topic and the same application consuming the same topic:

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit
import streamsx.eventstreams as eventstreams
import time

def delay (v):
time.sleep (5.0)
return True

topo = Topology ('EventStreamsHelloWorld')

to_evstr = topo.source (['Hello', 'World!'])
to_evstr = to_evstr.as_string()
# delay tuple by tuple
to_evstr = to_evstr.filter (delay)

# Publish a stream to Event Streams using HELLO topic
eventstreams.publish (to_evstr, topic='HELLO')

# Subscribe to same topic as a stream
from_evstr = eventstreams.subscribe (topo, schema=CommonSchema.String, topic='HELLO')

# You'll find the Hello World! in stdout log file:
from_evstr.print()

submit ('STREAMING_ANALYTICS_SERVICE', topo)
streamsx.eventstreams.subscribe(topology, topic, schema, group=None, credentials=None, name=None)

Subscribe to messages from Event Streams (Message Hub) for a topic.

Adds an Event Streams consumer that subscribes to a topic and converts each consumed message to a stream tuple.

Parameters:
  • topology (Topology) – Topology that will contain the stream of messages.
  • topic (str) – Topic to subscribe messages from.
  • schema (StreamSchema) – Schema for returned stream.
  • group (str) – Kafka consumer group identifier. When not specified it default to the job name with topic appended separated by an underscore.
  • credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for the Event Streams service. When set to None the application configuration messagehub is used.
  • name (str) – Consumer name in the Streams context, defaults to a generated name.
Returns:

Stream containing messages.

Return type:

Stream

streamsx.eventstreams.publish(stream, topic, credentials=None, name=None)

Publish Event Streams (Message Hub) messages to a topic.

Adds an Event Streams producer where each tuple on stream is published as a stream message.

Parameters:
  • stream (Stream) – Stream of tuples to published as messages.
  • topic (str) – Topic to publish messages to.
  • credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for the Event Streams service. When set to None the application configuration messagehub is used.
  • name (str) – Producer name in the Streams context, defaults to a generated name.
Returns:

Stream termination.

Return type:

streamsx.topology.topology.Sink

Indices and tables