IBM Streams Event Streams integration¶
Overview¶
IBM® Event Streams is a fully managed, cloud-based messaging service. It is built on Apache Kafka and is available through IBM Cloud® as a Service.
This module allows a Streams application to subscribe
a
message queue as a stream and publish
messages on a queue from a stream
of tuples.
Credentials¶
Event Streams credentials are defined using a Streams application configuration or setting the Event Streams service credentials JSON directly to the credentials
parameter of the functions.
By default an application configuration named messagehub is used,
a different configuration can be specified using the credentials
parameter to subscribe()
or publish()
.
The application configuration must contain the property messagehub.creds
with a value of the raw Event Streams service credentials JSON.
Messages¶
The schema of the stream defines how messages are handled.
CommonSchema.String
- Each message is a UTF-8 encoded string.CommonSchema.Json
- Each message is a UTF-8 encoded serialized JSON object.
No other formats are supported.
Sample¶
A simple hello world example of a Streams application publishing to a topic and the same application consuming the same topic:
from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit
import streamsx.eventstreams as eventstreams
import time
def delay (v):
time.sleep (5.0)
return True
topo = Topology ('EventStreamsHelloWorld')
to_evstr = topo.source (['Hello', 'World!'])
to_evstr = to_evstr.as_string()
# delay tuple by tuple
to_evstr = to_evstr.filter (delay)
# Publish a stream to Event Streams using HELLO topic
eventstreams.publish (to_evstr, topic='HELLO')
# Subscribe to same topic as a stream
from_evstr = eventstreams.subscribe (topo, schema=CommonSchema.String, topic='HELLO')
# You'll find the Hello World! in stdout log file:
from_evstr.print()
submit ('STREAMING_ANALYTICS_SERVICE', topo)
-
streamsx.eventstreams.
subscribe
(topology, topic, schema, group=None, credentials=None, name=None)¶ Subscribe to messages from Event Streams (Message Hub) for a topic.
Adds an Event Streams consumer that subscribes to a topic and converts each consumed message to a stream tuple.
Parameters: - topology (Topology) – Topology that will contain the stream of messages.
- topic (str) – Topic to subscribe messages from.
- schema (StreamSchema) – Schema for returned stream.
- group (str) – Kafka consumer group identifier. When not specified it default to the job name with topic appended separated by an underscore.
- credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for the Event Streams service. When set to
None
the application configurationmessagehub
is used. - name (str) – Consumer name in the Streams context, defaults to a generated name.
Returns: Stream containing messages.
Return type: Stream
-
streamsx.eventstreams.
publish
(stream, topic, credentials=None, name=None)¶ Publish Event Streams (Message Hub) messages to a topic.
Adds an Event Streams producer where each tuple on stream is published as a stream message.
Parameters: - stream (Stream) – Stream of tuples to published as messages.
- topic (str) – Topic to publish messages to.
- credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for the Event Streams service. When set to
None
the application configurationmessagehub
is used. - name (str) – Producer name in the Streams context, defaults to a generated name.
Returns: Stream termination.
Return type: streamsx.topology.topology.Sink