Skip to main content

Our StreamSets Data Integration Patterns

With our continuous and expanding use of StreamSets as our data integration component we started formalizing some emerging patterns that allowed us to:

  • Simplify maintenance

  • Avoid downtimes

  • Take advantage of the use of Kafka, HAproxy and pipeline replication to scale integrations with large data volumes or complex transformations

What follows is a list of some of our most used patterns.

Logical High-Level Data Integration Pattern

As a general rule, we divide each integration into 3 stages that can contain 3 or more pipelines.

/galleries/post-images/our-streamsets-data-integration-patterns/integration-standard.png

The advantages for this approach are:

  • Stages isolation allows process changes at a specific stage without affecting the others.

  • Kafka in the middle assures that if a parser or store is down, data continues to be consumed and stored in Kafka for 7 days.

  • Kafka also allows parallel processing with replicated pipelines.

  • The parsed logs pushed to Kafka can be accessed immediately by other tools such as Flink Streaming, KSQL or Machine Learning containers without replicating the rules applied in the Streamsets pipelines.

  • We can have pipelines running and consuming logs in computers outside our cluster and pushing the data to Kafka. Later the parsing and storage is centralized in the cluster.

Stage 1 - Consumer Pipeline

A StreamSets pipeline consumes raw data and pushes it into a Kafka Topic with the name projectname.raw.integration. For example projectx.raw.weblogic

  • This allows us to modify, stop and restart the Parse and Store pipelines without losing the data generated during maintenance windows

  • This pipeline is not needed for agents that write directly to kafka

  • For some specific cases, the same pipeline or another one writes the data to HDFS or external storage that needs the raw data. Mainly sources required for audits.

/galleries/post-images/our-streamsets-data-integration-patterns/1-consumer.png

Stage 2 - Parse & Enrichment Pipeline

A second pipeline consumes the raw data from the raw topic, parses and enriches the log and stores the new data in a new topic: projectname.parsed.integration

The most common transformations are:

  • Parse log pattern using Log Parser.

  • Remove unnecessary fields with Field Remover.

  • Rename or change a field's case using Field Renamer.

  • Generate new fields using Expression Evaluators.

  • Enrichment using Redis Lookup or Apache Solr as key value stores. For example:

    • Add an environment field (prod, stage, qa, dev) using the hostname as key

    • Add company details using their tax identification number

    • IP geolocation lookup

  • Convert date to UTC or string to numbers using Field Type Converter.

  • Discard records or route them to different Kafka topics using Stream Selector.

  • Flatten fields with Field Flattener.

  • Some complex business rules written in Jython.

  • Generate fields year, month, day for partitioning in Kudu.

  • Generate Globally Unique Identifier (GUID / UUID).

  • Parse date fields.

The following is our “2 - Apache Access - Parser” pipeline:

/galleries/post-images/our-streamsets-data-integration-patterns/apache-log-parser.png

Steps:

1. Consumes logs from the "raw" topic
2. Converts HTTP body to Json
3. Pivot Body fields to the root of the Record
4. Route apache error and apache access to different paths

   1.1. Parse apache error
   1.2. Add Kafka destination: parsed.apache_error topic

   2.1. Parse apache access
   2.2. Add Kafka destination: parsed.apache_access topic

5. Flatten fields
6. Remove headers and parsed names
7. Remove fields with raw data
8. Clean host names (lowercase and remove domain)
9. Solve environment using Redis
10. Add an ID field with an UUID
11. Convert dates to ISO
12. Convert timestamp fields to Long
13. Convert timestamps to milliseconds
14. Resolve apache access refererer
15. For apache access logs, use the IP address to identify geolocation
    using Apache Solr
16. Write the log to project_name.parsed.apache_access
    or project_name.parsed.apache_error

Stage 3 - Storage & Visualization Pipelines

The third pipeline consumes the parsed data from the topic and sends it to:

  • A Kudu table

  • HDFS folder

  • External storage (outside our cluster) such as Oracle databases

  • NAS storage for historical and backup such as EMC Isilon

  • Apache Solr

  • External Kafka Brokers (outside our cluster)

  • Instadeq for live dashboards and data exploration

Sometimes we use a single pipeline in StreamSets but if the destinations have different speeds or if one of them has more errors that provokes pipeline restarts, we use different pipelines for each destination.

We maintain the number 3 for all the pipelines because they all consume the logs from the parsed topics but send them to different destinations:

  • 3- Tomcat Access - Kudu Storage

  • 3- Tomcat Access - HDFS Storage

  • 3- Tomcat Access - Long-Term Isilon Storage

  • 3- Tomcat Access - Instadeq Dashboard

/galleries/post-images/our-streamsets-data-integration-patterns/3-store.png

Scaling Pipelines

When we have a pipeline with a high data volume, we scale it with:

For example in two customers we receive Tomcat access logs through HTTP posts and we have the following setup:

/galleries/post-images/our-streamsets-data-integration-patterns/lb-and-paralelism-flows.png

HAproxy Load Balancer

We create a new entry in our server running HAproxy: /etc/haproxy/haproxy.cfg

We receive HTTP Requests on port 4005 and forward them to 3 different “1- Tomcat Access - Consumer” StreamSets pipelines running in cluster-host-01, cluster-host-02 and cluster-host-03

frontend tomcat_access
bind 0.0.0.0:4005
mode http
stats enable
stats refresh 10s
stats hide-version
default_backend tomcat_access_servers

tomcat_access_servers
balance roundrobin
default-server maxconn 20
server sdc1 cluster-host-01:4005 check port 4005
server sdc2 cluster-host-02:4005 check port 4005
server sdc3 cluster-host-03:4005 check port 4005

Kafka: Topic Partitioning

We create projectx.raw.tomcat_access and projectx.parsed.tomcat_access Kafka topics with 3 or more partitions:

/bin/kafka-topics.sh --create \
        --zookeeper <hostname>:<port> \
        --topic projectx.raw.tomcat_access \
        --partitions 3 \
        --replication-factor <number-of-replicating-servers>

/bin/kafka-topics.sh --create \
        --zookeeper <hostname>:<port> \
        --topic projectx.parsed.tomcat_access \
        --partitions 3 \
        --replication-factor <number-of-replicating-servers>

Streamsets: Pipeline Replication

We replicate our pipeline to run in different cluster nodes, each one consuming from one of those partitions.

1- Tomcat Access - Consumer

  • Three pipeline instances running in three different nodes

  • Consume logs using a HTTP Server Origin listening on port 4005

  • Write the raw logs to Kafka using a Kafka Producer Destination

  • Target Kafka topic: projectx.raw.tomcat_access

2- Tomcat Access - Parser

  • Three pipeline instances running in three different nodes

  • Consume raw logs from Kafka using a Kafka Consumer Origin

  • Source topic: projectx.raw.tomcat_access

  • Parse, enrich and filter the logs

  • Write them to Kafka using a Kafka Producer Destination

  • Target topic: projectx.parsed.tomcat_access

3- Tomcat Access - Storage

  • Three pipeline instances running in three different nodes

  • Consume enriched Tomcat access logs from Kafka using a Kafka Consumer Origin

  • Source Kafka topic: projectx.parsed.tomcat_access

  • Store them in a Kudu table using a Kudu Destination.

  • Target Kudu table: projectx.tomcat_access

3- Tomcat Access - Instadeq Dashboard

  • A single pipeline instance

  • Consume enriched Tomcat Access logs from the three Kafka partitions using a Kafka Consumer Origin

  • Source Kafka Topic: projectx.parsed.tomcat_access

  • Send them to Instadeq using Instadeq webhooks

Examples

  1. Linux logs from RSysLog to a Kudu table using StreamSets for data ingestion and transformation

/galleries/post-images/our-streamsets-data-integration-patterns/linux-logs.png
  1. Java application logs sent to the cluster using log4j or logback, with StreamSets for data ingestion and transformation, KSQL for streaming analytics, Kudu for storage and Instadeq for live dashboards

/galleries/post-images/our-streamsets-data-integration-patterns/app-logs.png
  1. From Redmine to Instadeq Dashboard using Streamsets: Direct Integration without Kafka or any storage in the middle

/galleries/post-images/our-streamsets-data-integration-patterns/sdc-redmine-to-instadeq.png