Mobito.io

Consuming Data via Kafka Streaming

Consuming Data via Kafka Streaming

Introduction

This document provides instructions on how to consume messages streamed from Mobito to a Kafka topic hosted on AWS MSK.

The example below uses Python to set up a KafkaConsumer

The packages required are:

kafka-python — Kafka client for Python

aws-msk-iam-sasl-signer-python — IAM-based authentication for AWS MSK

Documentation:

Usage — kafka-python 2.3.0 documentation

For other languages supporting MSK IAM authentication, refer to the AWS MSK IAM client configuration guide.


Installation

1. Configure AWS Profile

Setup a local aws profile by running aws configure where you will be prompted for your access key, secret key and default region (eu-west-1). Use the keys provided by Mobito.

aws configure creates a default aws profile. If you want to create a named profile use:

aws configure --profile <name>

If you use a named profile you need to add the following environment variable:

export AWS_PROFILE=<profile_name>

2. Set Up Python Environment

Create a virtual environment. You can replace env with your name of choice:

python3 -m venv env
source env/bin/activate
pip install kafka-python aws-msk-iam-sasl-signer-python grpcio-tools protobuf lz4

3. Compile the provided .proto file

Use the following command with a .proto file named vehicle_data.proto. This will create the python file vehicle_data_pb2.py. This file is used as an import in the Python Client below.

python -m grpc_tools.protoc -I. --python_out=. vehicle_data.proto

4) Python Client

Create a python file with the following code and replace BROKER_URLS, TOPIC_NAME, USERNAME with the values provided by Mobito.

import json
import datetime
from kafka import KafkaConsumer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
from kafka.sasl.oauth import AbstractTokenProvider
from google.protobuf.json_format import MessageToDict
import vehicle_data_pb2

BROKERS_STRING = "<BROKERS_URLS>"


class MSKTokenProvider(AbstractTokenProvider):
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token("eu-west-1")
        return token


tp = MSKTokenProvider()
topic = "<TOPIC_NAME>"

consumer = KafkaConsumer(
    topic,
    bootstrap_servers=[broker for broker in BROKERS_STRING.split(",")],
    value_deserializer=lambda v: vehicle_data_pb2.VehicleData.FromString(v),
    auto_offset_reset="earliest",
    api_version=(3, 8, 0),
    security_protocol="SASL_SSL",
    sasl_mechanism="OAUTHBEARER",
    sasl_oauth_token_provider=tp,
    group_id="<USERNAME>",  # Change to your username
)

# At this point data may be processed at your will. You should handle the message below.
# This example just prints the timestamp of the message
for message in consumer:
    time = datetime.datetime.fromtimestamp(message.timestamp / 1000.0)
    print(f"Received message at {time} (Timestamp: {message.timestamp})")

Consumer Group Naming Convention

Consumer groups follow the pattern *, meaning you can create any group that starts with your username. For example, with username mobito, you can use mobito-test to test the connection, and then switch to mobito as the consumer group to start consuming messages from the beginning in production.

Replace mobito with your own username.

Still have questions?

Let us know what you're trying to achieve and a Mobito expert will help you figure out the best data setup.

Contact us

Why guess, when mobility data can guide you?

Get in touch with our experts to discover how you can leverage vehicle data in your business