IBM Streams Event Streams integration in IBM Streams topology applications¶
Overview¶
IBM® Event Streams is a fully managed, cloud-based messaging service. It is built on Apache Kafka and is available through IBM Cloud® as a Service.
This module allows a Streams application to subscribe
a
message queue as a stream and publish
messages on a queue from a stream
of tuples.
Credentials¶
Event Streams credentials are defined using a Streams application configuration
or setting the Event Streams service credentials JSON directly to the
credentials
parameter of the functions as dict
type.
Credentials Sample:
import json
import streamsx.eventstreams as eventstreams
from streamsx.topology.schema import CommonSchema
from streamsx.topology.topology import Topology
topology = Topology ('CredentialsExample')
myCredentialsJsonString = """
{
your_credentials_json_formatted
in multiple lines
}
"""
myCredentials = json.loads(myCredentialsJsonString)
stream = eventstreams.subscribe(topology, topic="topic",
schema=CommonSchema.String, credentials=myCredentials)
By default an application configuration named eventstreams is used,
a different configuration can be specified using the credentials
parameter to subscribe()
or publish()
.
The application configuration must contain the property eventstreams.creds
with a value of the raw Event Streams service credentials JSON.
Messages¶
The schema of the stream defines how messages are handled.
CommonSchema.String
- Each message is a UTF-8 encoded string.CommonSchema.Json
- Each message is a UTF-8 encoded serialized JSON object.StringMessage
- structured schema with message and keyBinaryMessage
- structured schema with message and keyStringMessageMeta
- structured schema with message, key, and message meta dataBinaryMessageMeta
- structured schema with message, key, and message meta data
No other formats are supported.
Sample¶
A simple hello world example of a Streams application publishing to a topic and the same application consuming the same topic:
from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit, ContextTypes
import streamsx.eventstreams as eventstreams
import time
def delay (v):
time.sleep (5.0)
return True
topology = Topology ('EventStreamsHelloWorld')
to_evstr = topology.source (['Hello', 'World!'])
to_evstr = to_evstr.as_string()
# delay tuple by tuple
to_evstr = to_evstr.filter (delay)
# Publish a stream to Event Streams using HELLO topic
eventstreams.publish (to_evstr, topic='HELLO')
# Subscribe to same topic as a stream
from_evstr = eventstreams.subscribe (topology, schema=CommonSchema.String, topic='HELLO')
# You'll find the Hello World! in stdout log file:
from_evstr.print()
submit (ContextTypes.STREAMING_ANALYTICS_SERVICE, topology)
-
streamsx.eventstreams.
download_toolkit
(url=None, target_dir=None)¶ Downloads the latest streamsx.messagehub toolkit from GitHub.
Example for updating the toolkit for your topology with the latest toolkit from GitHub:
import streamsx.eventstreams as es # download the toolkit from GitHub toolkit_location = es.download_toolkit() # add the toolkit to topology streamsx.spl.toolkit.add_toolkit(topology, toolkit_location)
Example for updating the topology with a specific version of the streamsx.messagehub toolkit using an URL:
import streamsx.eventstreams as es url202 = 'https://github.com/IBMStreams/streamsx.messagehub/releases/download/v2.0.2/com.ibm.streamsx.messagehub-2.0.2.tgz' toolkit_location = es.download_toolkit(url=url202) streamsx.spl.toolkit.add_toolkit(topology, toolkit_location)
- Parameters
url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is
None
a location relative to the system temporary directory is chosen.
- Returns
the location of the downloaded streamsx.messagehub toolkit
- Return type
str
Note
This function requires an outgoing Internet connection
New in version 1.3.
-
streamsx.eventstreams.
configure_connection
(instance, name='eventstreams', credentials=None)¶ Configures IBM Streams for a certain connection.
Creates an application configuration object containing the required properties with connection information.
Example for creating a configuration for a Streams instance with connection details:
from icpd_core import icpd_util from streamsx.rest_primitives import Instance import streamsx.eventstreams as es cfg = icpd_util.get_service_instance_details(name='your-streams-instance') cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False instance = Instance.of_service(cfg) app_cfg = es.configure_connection(instance, credentials='my_crdentials_json')
- Parameters
instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.
name (str) – Name of the application configuration, default name is ‘eventstreams’.
credentials (str|dict) – The service credentials for Eventstreams.
- Returns
Name of the application configuration.
Warning
The function can be used only in IBM Cloud Pak for Data.
New in version 1.1.
-
streamsx.eventstreams.
subscribe
(topology, topic, schema, group=None, credentials=None, name=None)¶ Subscribe to messages from Event Streams (Message Hub) for a topic.
Adds an Event Streams consumer that subscribes to a topic and converts each consumed message to a stream tuple.
- Parameters
topology (Topology) – Topology that will contain the stream of messages.
topic (str) – Topic to subscribe messages from.
schema (StreamSchema) – Schema for returned stream.
group (str) – Kafka consumer group identifier. When not specified it default to the job name with topic appended separated by an underscore, so that multiple
subscribe
calls with the same topic in one topology automatically build a consunsumer group.credentials (dict|str) – Credentials in JSON or name of the application configuration containing the credentials for the Event Streams service. When set to
None
the application configurationeventstreams
is used.name (str) – Consumer name in the Streams context, defaults to a generated name.
- Returns
Stream containing messages.
- Return type
Stream
-
streamsx.eventstreams.
publish
(stream, topic, credentials=None, name=None)¶ Publish Event Streams messages to a topic.
Adds an Event Streams producer where each tuple on stream is published as a message into IBM Event Streams cloud service.
- Parameters
stream (Stream) – Stream of tuples to published as messages.
topic (str) – Topic to publish messages to.
credentials (dict|str) – Credentials in JSON or name of the application configuration containing the credentials for the Event Streams service. When set to
None
the application configurationeventstreams
is used.name (str) – Producer name in the Streams context, defaults to a generated name.
- Returns
Stream termination.
- Return type
streamsx.topology.topology.Sink
Schemas for streams created with the subscribe()
method, and usable for
streams terminated with the publish()
. All of these message types are keyed messages.
-
class
streamsx.eventstreams.schema.
Schema
¶ Bases:
object
Structured stream schemas for keyed messages for
subscribe()
, and for streams that are published bypublish()
to an Event Streams topic.The schemas
have the attributes
message
, andkey
. They vary in the type for themessage
attribute and can be used forsubscribe()
and for the stream published withpublish()
.The schemas
have the attributes
message
,key
,topic
,partition
,offset
, andmessageTimestamp
. They vary in the type for themessage
attribute and can be used forsubscribe()
andpublish()
.All schemas defined in this class are instances of streamsx.topology.schema.StreamSchema.
The following sample uses structured schemas for publishing messages with keys to a potentially partitioned topic in Event Streams. Then, it creates a consumer group that subscribes to the topic, and processes the received messages in parallel channels partitioned by the message key:
from streamsx.topology.topology import Topology from streamsx.topology.context import submit, ContextTypes from streamsx.topology.topology import Routing from streamsx.topology.schema import StreamSchema from streamsx.eventstreams.schema import Schema import streamsx.eventstreams as evst import random import time import json from datetime import datetime # Define a callable source for data that we push into Event Streams class SensorReadingsSource(object): def __call__(self): # This is just an example of using generated data, # Here you could connect to db, generate data, # connect to data set, open file, ... i = 0 # wait that the consumer is ready before we start creating data time.sleep(20.0) while(i < 10000): time.sleep(0.01) # 100 per second i = i + 1 sensor_id = random.randint(1, 100) reading = {} reading["sensor_id"] = "sensor_" + str(sensor_id) reading["value"] = random.random() * 3000 reading["ts"] = int(datetime.now().timestamp()) yield reading # parses the JSON in the message and adds the attributes to a tuple def flat_message_json(tuple): messageAsDict = json.loads(tuple['message']) tuple.update(messageAsDict) return tuple # calculate a hash code of a string in a consistent way # needed for partitioned parallel streams def string_hashcode(s): h = 0 for c in s: h = (31 * h + ord(c)) & 0xFFFFFFFF return ((h + 0x80000000) & 0xFFFFFFFF) - 0x80000000 topology = Topology('EventStreamsParallel') # # the producer part # # create the data and map them to the attributes 'message' and 'key' of the # 'Schema.StringMessage' schema for Kafka, so that we have messages with keys sensorStream = topology.source( SensorReadingsSource(), "RawDataSource" ).map( func=lambda reading: {'message': json.dumps(reading), 'key': reading['sensor_id']}, name="ToKeyedMessage", schema=Schema.StringMessage) # assume, we have created an application configuration with name 'eventstreams' eventStreamsSink = evst.publish( sensorStream, topic="threePartitionTopic1", credentials='eventstreams', name="SensorPublish") # # the consumer side # # subscribe, create a consumer group with 3 consumers consumerSchema = Schema.StringMessageMeta received = evst.subscribe( topology, topic="threePartitionTopic1", schema=consumerSchema, group='my_consumer_group', credentials='eventstreams', name="SensorSubscribe" ).set_parallel(3).end_parallel() # start a different parallel region partitioned by message key, # so that each key always goes into the same parallel channel receivedParallelPartitioned = received.parallel( 5, routing=Routing.HASH_PARTITIONED, func=lambda x: string_hashcode(x['key'])) # schema extension, here we use the Python 2.7, 3 way flattenedSchema = consumerSchema.extend( StreamSchema('tuple<rstring sensor_id, float64 value, int64 ts>')) receivedParallelPartitionedFlattened = receivedParallelPartitioned.map( func=flat_message_json, name='JSON2Attributes', schema=flattenedSchema) # validate by remove negativ and zero values from the streams, # pass only positive vaues and timestamps receivedValidated = receivedParallelPartitionedFlattened.filter( lambda tup: (tup['value'] > 0) and (tup['ts'] > 0), name='Validate') # end parallel processing and print the combined streams to stdout log receivedValidated.end_parallel().print() submit(ContextTypes.STREAMING_ANALYTICS_SERVICE, topology)
-
BinaryMessage
= <streamsx.topology.schema.StreamSchema object>¶ Stream schema with message and key, where the message is a binary object (sequence of bytes), and the key is a string.
The schema defines following attributes
message(bytes) - the message content
key(str) - the key for partitioning
This schema can be used for both
subscribe()
, and for streams that are published bypublish()
.New in version 1.2.
-
BinaryMessageMeta
= <streamsx.topology.schema.StreamSchema object>¶ Stream schema with message, key, and message meta data, where the message is a binary object (sequence of bytes), and the key is a string. This schema can be used for
subscribe()
.The schema defines following attributes
message(bytes) - the message content
key(str) - the key for partitioning
topic(str) - the Event Streams topic
partition(int) - the topic partition number (32 bit)
offset(int) - the offset of the message within the topic partition (64 bit)
messageTimestamp(int) - the message timestamp in milliseconds since epoch (64 bit)
New in version 1.2.
-
StringMessage
= <streamsx.topology.schema.StreamSchema object>¶ Stream schema with message and key, both being strings.
The schema defines following attributes
message(str) - the message content
key(str) - the key for partitioning
This schema can be used for both
subscribe()
, and for streams that are published bypublish()
.New in version 1.2.
-
StringMessageMeta
= <streamsx.topology.schema.StreamSchema object>¶ Stream schema with message, key, and message meta data, where both message and key are strings. This schema can be used for
subscribe()
.The schema defines following attributes
message(str) - the message content
key(str) - the key for partitioning
topic(str) - the Event Streams topic
partition(int) - the topic partition number (32 bit)
offset(int) - the offset of the message within the topic partition (64 bit)
messageTimestamp(int) - the message timestamp in milliseconds since epoch (64 bit)
New in version 1.2.
-