Parsing JSON strings from Kafka using Apache Flink and GSON

Parsing JSON strings from Kafka using Apache Flink and GSON

Gepubliceerd: Categorie: Data Science

Deze blog is alleen beschikbaar in het engels

In this blog I will discuss stream processing with Apache Flink and Kafka. Specifically, I will look at parsing and processing JSON strings in real-time in an object-oriented way. I will use Flink’s Java API to create a solution for a sports data use case related to real-time stream processing. If you follow the steps described in this blog, you should be able to copy the concept and apply it to any other real-time stream processing project!

For the described implementation, I set up a Flink skeleton project following the Flink documentation linked below. I used football position data with the following format:

  1. {"Timestamp": 82.6,
  2. "pos": [
  3. {"X": 12.5, "Y": -23.5, "PlrID": 0.0, "TeamName": null},
  4. {"X": -34.468, "Y": -0.774, "PlrID": 1.0, "TeamName": "Team 1"},
  5. {"X": -3.611, "Y": 1.898, "PlrID": 2.0, "TeamName": "Team 1"},
  6. {"X": 48.028, "Y": -1.327, "PlrID": 13.0, "TeamName": "Team 2"},
  7. {"X": 28.363, "Y": -25.225, "PlrID": 14.0, "TeamName": "Team 2"},
  8. ]}

These data describe the positions of the players and the ball on the field, as available in some commercial player tracking systems in the form of near real-time streams of 10-30 measurements per second. 

Steps taken in the Flink skeleton project can be found here

Maven dependencies

By default, Flink only has a few basic connectors, which are mostly useful for testing purposes. For example, reading and writing to files and socket connections. If more advanced connections are required, such as with a Kafka cluster, additional dependencies have to be installed. With Apache Maven, these dependencies are simply included by adding extra entries to the pom.xml file. The following lines have to be added to include the Kafka connectors for Kafka versions 1.0.0 and higher:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>

The messages in Kafka topics are essentially bytes representing JSON strings. Using Flink’s SimpleStringSchema, we can interpret these bytes as strings. What we are really interested in, however, is the object and the hierarchical data it represents. For this, we need some kind of JSON parser, and it just so happens that Google’s GSON library is an excellent one. It interacts with Java objects in intuitive ways and lets us load the contents of JSON strings directly into Java objects. To use it, we need to add an extra dependency to our pom.xml file:

  1. <dependency>
  2. <groupId>com.google.code.gson</groupId>
  3. <artifactId>gson</artifactId>
  4. <version>2.8.6</version>
  5. </dependency>

More information about the Kafka connector dependencies can be found here.

Kafka IO

I will start by adding our Kafka connectors. In stream processing terminology, a connector that reads data from an external source is referred to as a consumer. A connector that writes data to an external sink is referred to as a producer. First, we need to import Flink’s Kafka consumer, Kafka producer, and a few other classes that are used for configuring the connectors, parsing bytes from Kafka and manipulating data streams:

  1. import java.util.Properties;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

The DataStream is the main interface for Flink data streams and provides many member functions that are useful for manipulating them. A DataStream needs to have a specific type defined, and essentially represents an unbounded stream of data structures of that type. For example, DataStream<String> represents a data stream of strings.

Now, we use Flink’s Kafka consumer to read data from a Kafka topic. We do so by including the following code in the StreamingJob class’ main function, after the env variable declaration:

  1. // Set up the Consumer and create a datastream from this source
  2. Properties properties = new Properties();
  3. properties.setProperty("bootstrap.servers", "localhost:9092");
  4. FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>(
  5. "instream", // input topic
  6. new SimpleStringSchema(), // serialization schema
  7. properties); // properties
  8.  
  9. DataStream<String> inputStream = env.addSource(myConsumer);

A couple of things happen here. We configure a FlinkKafkaConsumer to read from the instream topic at address localhost:9092, which is where our dockerized Kafka broker runs.

We also provided a SimpleStringSchema, which tells the consumer to interpret bytes from Kafka as strings. With the env.addSource() call, we then receive a DataStream object in variable inputStream, which represents a stream of strings coming from Kafka.

For now, we just want to write the received JSON strings back to Kafka and add the FlinkKafkaProducer that will take care of this:

  1. // Set up the Producer and add it as a datastream sink
  2. FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
  3. "localhost:9092", // broker list
  4. "outstream", // target topic
  5. new SimpleStringSchema()); // serialization schema
  6. myProducer.setWriteTimestampToKafka(true);
  7.  
  8.  
  9. inputStream.addSink(myProducer);

A couple of things happen here. The producer is set up to write messages back to the outstream topic in the Kafka node located at localhost:9092. We use a SimpleStringSchema again to create a producer that expects JSON strings. 

When the code in this section is added to the StreamingJob class’ main member function, the code should compile without any problems! When submitted to the Flink cluster, it will read JSON strings from the instream topic in the Kafka cluster and immediately write the received strings back to the outstream topic.

Of course, performing operations directly on the JSON string itself is cumbersome. It contains hierarchical information that we would rather access in an object-oriented way. In the remainder of this blog, I will present more code to make this much easier.

For more information about all the connectors that Flink offers, check out this link and this link.

Data structures

To elegantly access the information in JSON strings in an object-oriented way, I will use GSON, a JSON library for Java. In order to do this effectively, we need to define POJOs (Plain Old Java Objects) with a format matching the expected input position data. We define a class that can hold the position of a player, including the identifier and team name. The following code is saved to a new file Position.java in the package directory:

  1. package flinkjob.pojo;
  2.  
  3. public class Position
  4. {
  5. public Double X;
  6. public Double Y;
  7. public Double PlrID;
  8. public String TeamName;
  9.  
  10. public Position() {}
  11. }

Position.java

We use this class to create a new InputMessage class that perfectly matches the expected format of the position data input messages. The code for this is saved to InputMessage.java in the flinkjob package directory::

  1. package flinkjob.pojo;
  2.  
  3. public class InputMessage
  4. {
  5. public double Timestamp;
  6. public Position[] pos;
  7.  
  8. public InputMessage() {}
  9. }

InputMessage.java

If you compare this class to the example position data message mentioned earlier in this post, you will see that it correctly describes the expected format. The Timestamp field contains a double value, and the pos field is an array of Position objects, where Position objects correctly have X, Y, PlrID and TeamName fields. The public member variables match up perfectly with the expected fields. Notice how both classes have an empty constructor - this constructor is mandatory to ensure that Flink can serialize objects of these classes properly. Without it, the Flink runtime would throw an error.

Parsing with GSON

In order to parse JSON strings properly, we first need to include a GSON instance in our streaming job. To do this, add the following imports to StreamingJob.java:

  1. import com.google.gson.Gson;
  2. import com.google.gson.GsonBuilder;

We can then include the GSON instance by adding the following member variable declaration to the StreamingJob class:

  1. private static final Gson gson =
  2. new GsonBuilder().serializeSpecialFloatingPointValues().create();

Now, we can use GSON to parse the JSON strings contained within the inputStream data stream. In order to do so, import the data structure we want to load the message data into:

  1. import flinkjob.pojo.InputMessage;

Then, update the StreamingJob’s main function with the following line of code:

  1. DataStream<InputMessage> messageStream = inputStream
  2. .map(json -> gson.fromJson(json, InputMessage.class));

In this code, we map the stream of strings to another format using the DataStream member function map, which performs a 1-to-1 transformation of every element in the stream. Inside this map call we use the GSON function fromJson(..., InputMessage.class) to transform individual strings to matching InputMessage objects.

Now, we can access the information contained within individual JSON strings in an object-oriented way, which is what we wanted to achieve! For example, here is a map function that filters out participants with invalid coordinates:

  1. package flinkjob.maps;
  2.  
  3. import java.util.Arrays;
  4. import org.apache.flink.api.common.functions.RichMapFunction;
  5. import flinkjob.pojo.InputMessage;
  6. import flinkjob.pojo.Position;
  7.  
  8. public class FilterParticipantsMapFunction extends RichMapFunction<InputMessage, InputMessage>
  9. {
  10. public InputMessage map(InputMessage input)
  11. {
  12. InputMessage output = new InputMessage();
  13. output.Timestamp = input.Timestamp;
  14. output.pos = Arrays.stream(input.pos)
  15. .filter(p -> (p.X != null && p.Y != null)).toArray(Position[]::new);
  16.  
  17. return output;
  18. }
  19. }

FilterParticipantsMapFunction.java

Import this custom map function as follows:

  1. import flinkjob.maps.FilterParticipantsMapFunction;

Now, we can map the parsed input stream to a new message stream with filtered message contents:

  1. DataStream<InputMessage> processedStream = messageStream
  2. .map(new FilterParticipantsMapFunction());

Since our producer expects a stream of JSON strings, we transform the stream of objects back to a stream of JSON strings using GSON toJson() function:

  1. DataStream<String> outputStream = processedStream
  2. .map(element -> gson.toJson(element));
  3.  
  4. outputStream.addSink(myProducer);

After adding the new code described in this section, you should be able to compile the Flink streaming job without any errors! Parsing streams of JSON strings the way I described makes it easy to access the contained hierarchical data. You should now have a good understanding of how to apply the shown concepts to more complex use cases.

More information

For more information, please contact Jonathan Robijn via jrobijn@qualogy.com.

Jonathan Robijn
Over auteur Jonathan Robijn

Jonathan Robijn is Data Scientist at Qualogy. He majored in Computer Science at Leiden University with a minor in Information Technology in Business. During his master thesis research project Jonathan applied machine learning to football position data to predict the risk factor of passes. At Qualogy he developed Quarts, a stream processing framework optimized for processing sensor measurement datastreams. Jonathan has multiple years of experience with data analysis and data engineering and is fluent in multiple programming languages including Python and C++.

Meer posts van Jonathan Robijn
Reacties
Reactie plaatsen