AWS Big Data Blog

Introducing the new Amazon Kinesis source connector for Apache Flink

On November 11, 2024, the Apache Flink community released a new version of AWS services connectors, an AWS open source contribution. This new release, version 5.0.0, introduces a new source connector to read data from Amazon Kinesis Data Streams. In this post, we explain how the new features of this connector can improve performance and reliability of your Apache Flink application.

Apache Flink has both a source and sink connector, to read from and write to Kinesis Data Streams. In this post, we focus on the new source connector, because version 5.0.0 does not introduce new functionality for the sink.

Apache Flink is a framework and distributed stream processing engine designed to perform computation at in-memory speed and at any scale. Amazon Managed Service for Apache Flink offers a fully managed, serverless experience to run your Flink applications, implemented in Java, Python or SQL, and using all the APIs available in Flink: SQL, Table, DataStream, and ProcessFunction API.

Apache Flink connectors

Flink supports reading and writing data to external systems, through connectors, which are components that allow your application to interact with stream-storage message brokers, databases, or object stores. Kinesis Data Streams is a popular source and destination for streaming applications. Flink provides both source and sink connectors for Kinesis Data Streams.

The following diagram illustrates a sample architecture.

Role of connectors in a Flink applications

Before proceeding further, it’s important to clarify three terms often used interchangeably in data streaming and in the Apache Flink documentation:

  • Kinesis Data Streams refers to the Amazon service
  • Kinesis source and Kinesis consumer refer to the Apache Flink components, in particular the source connectors, that allows reading data from Kinesis Data Streams
  • In this post, we use the term stream referring to a single Kinesis data stream

Introducing the new Flink Kinesis source connector

The launch of the version 5.0.0 of AWS connectors introduces a new connector for reading events from Kinesis Data Streams. The new connector is called Kinesis Streams Source and supersedes the Kinesis Consumer as the source connector for Kinesis Data Streams.

The new connector introduces several new features and adheres to the new Flink Source interface, and is compatible with Flink 2.x, the first major version release by the Flink community. Flink 2.x introduces a number of breaking changes, including removing the SourceFunction interface used by legacy connectors. The legacy Kinesis Consumer will no longer work with Flink 2.x.

Setting up the connector is slightly different than with the legacy Kinesis connector. Let’s start with the DataStream API.

How to use the new connector with the DataStream API

To add the new connector to your application, you need to update the connector dependency. For the DataStream API, the dependency has changed its name to flink-connector-aws-kinesis-streams.

At the time of writing, the latest connector version is 5.0.0 and it supports the most recent stable Flink versions, 1.19 and 1.20. The connector is also compatible with Flink 2.0, but no connector has been officially released for Flink 2.x yet. Assuming you are using Flink 1.20, the new dependency is the following:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-aws-kinesis streams</artifactId>
    <version>5.0.0-1.20</version>
</dependency>

The connector uses the new Flink Source interface. This interface implements the new FLIP-27 standard, and replaces the legacy SourceFunction interface that has been deprecated. SourceFunction will be completely removed in Flink 2.x.

In your application, you can now use a fluent and expressive builder interface to instantiate and configure the source. The minimal setup only requires the stream Amazon Resource Name (ARN) and the deserialization schema:

KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder()
    .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
    .setDeserializationSchema(new SimpleStringSchema())
    .build();

The new source class is called KinesisStreamSource. Not to be confused with the legacy source, FlinkKinesisConsumer.

You can then add the source to the execution environment using the new fromSource() method. This method requires explicitly specifying the watermark strategy, along with a name for the source:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ...
DataStream<String> kinesisRecordsWithEventTimeWatermarks = env.fromSource(
    kdsSource,
    WatermarkStrategy.<String>forMonotonousTimestamps()
        .withIdleness(Duration.ofSeconds(1)),
    "Kinesis source");

These few lines of code introduce some of the main changes in the interface of the connector, which we discuss in the following sections.

Stream ARN

You can now define the Kinesis data stream ARN, as opposed to the stream name. This makes it simpler to consume from streams cross-Region and cross-account.

When running in Amazon Managed Service for Apache Flink, you only need to add to the application AWS Identity and Access Management (IAM) role permissions to access the stream. The ARN allows pointing to a stream located in a different AWS Region or account, without assuming roles or passing any external credentials.

Explicit watermark

One of the most important characteristics of the new Source interface is that you have to explicitly define a watermark strategy when you attach the source to the execution environment. If your application only implements processing-time semantics, you can specify WatermarkStrategy.noWatermarks().

This is an improvement in terms of code readability. Looking at the source, you know immediately which type of watermark you have, or if you don’t have any. Previously, many connectors were providing some type of default watermarks that the user could override. However, the default watermark of each connector was slightly different and confusing for the user.

With the new connector, you can achieve the same behavior as the legacy FlinkKinesisConsumer default watermarks, using WatermarkStrategy.forMonotonousTimestamps(), as shown in the previous example. This strategy generates watermarks based on the approximateArrivalTimestamp returned by Kinesis Data Streams. This timestamp corresponds to the time when the record was published to Kinesis Data Streams.

Idleness and watermark alignment

With the watermark strategy, you can additionally define an idleness, which allows the watermark to progress even when some shards of the stream are idle and receiving no records. Refer to Dealing With Idle Sources for more details about idleness and watermark generators.

A feature introduced by the new Source interface, and fully supported by the new Kinesis source, is watermark alignment. Watermark alignment works in the opposite direction of idleness. It slows down consuming from a shard that is progressing faster than others. This is particularly useful when replaying data from a stream, to reduce the volume of data buffered in the application state. Refer to Watermark alignment for more details.

Set up the connector with the Table API and SQL

Assuming you are using Flink 1.20, the dependency containing both Kinesis source and sink for the Table API and SQL is the following (both Flink 1.19 and 1.20 are supported, adjust the version accordingly):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId>
    <version>5.0.0-1.20</version>
</dependency>

This dependency contains both the new source and the legacy source. Refer to Versioning in case you are planning to use both in the same application.

When defining the source in SQL or the Table API, you use the connector name kinesis, as it was with the legacy source. However, many parameters have changed with the new source:

CREATE TABLE KinesisTable (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `category_id` BIGINT,
    `behavior` STRING,
    `ts` TIMESTAMP(3)
)
PARTITIONED BY (user_id, item_id)
WITH (
    'connector' = 'kinesis',
    'stream.arn' = 'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name',
    'aws.region' = 'us-east-1',
    'source.init.position' = 'LATEST',
    'format' = 'csv'
);

A couple of notable connector options changed from the legacy source are:

  • stream.arn specifies the stream ARN, as opposed to the stream name used in the legacy source.
  • init.initpos defines the starting position. This option works similarly to the legacy source, but the option name is different. It was previously scan.stream.initpos.

For the full list of connector options refer to Connector Options.

New features and improvements

In this section, we discuss the most important features introduced by the new connector. These features are available in the DataStream API, and also the Table API and SQL.

Ordering guarantees

The most important improvement introduced by the new connector is about ordering guarantees.

With Kinesis Data Streams, the order of the message is retained per partitionId. This is achieved by putting all records with the same partitionId in the same shard. However, when the stream scales, splitting or merging shards, records with the same partitionId end up in a new shard. Kinesis keeps track of the parent-child lineage when resharding happens.

Stream resharding

One known limitation of the legacy Kinesis source is that it was unable to follow the parent-child shard lineage. As a consequence, ordering could not be guaranteed when resharding happens. The problem was particularly relevant when the application replayed old messages from a stream that had been resharded because ordering would be lost. This also made watermark generation and event-time processing non-deterministic.

With the new connector, ordering is retained also when resharding happens. This is achieved following the parent-child shard lineage, and consuming all records from a parent shard before proceeding with the child shard.

How the connector follows shard lineage

A better default shard assigner

Each Kinesis data stream is comprised of many shards. Also, the Flink source operator runs in multiple parallel subtasks. The shard assigner is the component that decides how to assign the shards of the stream across the source subtasks. The shard assigner’s job is non-trivial, because shard split or merge operations (resharding) might happen when the stream scales up or down.

The new connector comes with a new default assigner, UniformShardAssigner. This assigner maintains uniform distribution of the stream partitionId across parallel subtasks, also when resharding happens. This is achieved by looking at the range of partition keys (HashKeyRange) of each shard.

This shard assigner was already available in the previous connector version, but for backward compatibility, it was not the default and you had to set it up explicitly. This is no longer the case with the new source. The old default shard assigner, the legacy FlinkKinesisConsumer, was evenly distributing shards (not partitionId) across subtasks. In this case, the actual data distribution might become uneven in the case of resharding, because of the combination of open and closed shards in the stream. Refer to Shard Assignment Strategy for more details.

Reduced JAR size

The size of the JAR file has been reduced by 99%, from about 60 MB down to 200 KB. This substantially reduces the size of the fat-JAR of your application using the connector. A smaller JAR can speed up many operations that require redeploying the application.

AWS SDK for Java 2.x

The new connector is based on the newer AWS SDK for Java 2.x, which adds several features and improves support for non-blocking I/O. This makes the connector future-proof because the AWS SDK v1 will reach end-of-support by end of 2025.

AWS SDK built-in retry strategy

The new connector relies on the AWS SDK built-in retry strategy, as opposed to a custom strategy implemented by the legacy connector. Relying on the AWS SDK improves the classification of some errors as retriable or non-retriable.

Removed dependency on the Kinesis Client Library and Kinesis Producer Library

The new connector package no longer includes the Kinesis Client Library (KCL) and Kinesis Producer Library (KPL), contributing to the substantial reduction of the JAR size that we have mentioned.

An implication of this change is that the new connector no longer supports de-aggregation out of the box. Unless you are publishing records to the stream using the KPL and you enabled aggregation, this will not make any difference for you. If your producers use KPL aggregation, you might consider implementing a custom DeserializationSchema to de-aggregate the records in the source.

Migrating from the legacy connector

Flink sources typically save the position in the checkpoint and savepoints, called snapshots in Amazon Managed Service for Apache Flink. When you stop and restart the application, or when you update the application to deploy a change, the default behavior is saving the source position in the snapshot just before stopping the application, and restoring the position when the application restarts. This allows Flink to provide exactly-once guarantees on the source.

However, due to the major changes introduced by the new KinesisSource, the saved state is no longer compatible with the legacy FlinkKinesisConsumer. This means that when you upgrade the source of an existing application, you can’t directly restore the source position from the snapshot.

For this reason, migrating your application to the new source requires some attention. The exact migration process depends on your use case. There are two general scenarios:

  • Your application uses the DataStream API and you are following Flink best practices defining a UID on each operator
  • Your application uses the Table API or SQL, or your application used the DataStream API and you are not defining a UID on each operator

Let’s cover each of these scenarios.

Your application uses the DataStream API and you are defining a UID on each operator

In this case, you might consider selectively resetting the state of the source operator, retaining any other application state. The general approach is as follows:

  1. Update your application dependencies and code, replacing the FlinkKinesisConsumer with the new KinesisSource.
  2. Change the UID of the source operator (use a different string). Leave all other operators’ UIDs This will selectively reset the state of the source while retaining the state of all other operators.
  3. Configure the source starting position using AT_TIMESTAMP and set the timestamp to just before the moment you will deploy the change. See Configuring Starting Position to learn how to set the starting position. We recommend passing the timestamp as a runtime property to make this more flexible. The configured source starting position is used only when the application can’t restore the state from a savepoint (or snapshot). In this case, we are deliberately forcing this, changing the UID of the source operator.
  4. Update the Amazon Managed Service for Apache Flink application, selecting the new JAR containing the modified application. Restart from the latest snapshot (default behavior) and select allowNonRestoredState = true. Without this flag, Flink would prevent restarting the application, not being able to restore the state of the old source that was saved in the snapshot. See Savepointing for more details about allowNonRestoredState.

This approach will cause the reprocessing of some records from the source, and internal state exactly-once consistency can be broken. Carefully evaluate the impact of reprocessing on your application, and the impact of duplicates on the downstream systems.

Your application uses the Table API or SQL, or your application used the DataStream API and you are not defining a UID on each operator

In this case, you can’t selectively reset the state of the source operator.

Why does this happen? When using the Table API or SQL, or the DataStream API without defining the operator’s UID explicitly, Flink automatically generates identifiers for all operators based on the structure of the job graph of your application. These identifiers are used to identify the state of each operator when saved in the snapshots, and to restore it to the correct operator when you restart the application.

Changes to the application might cause changes in the underlying data flow. This changes the auto-generated identifier. If you are using the DataStream API and you are specifying the UID, Flink uses your identifiers instead of the auto-generated identifies, and is able to map back the state to the operator, even when you make changes to the application. This is an intrinsic limitation of Flink, explained in Set UUIDs For All Operators. Enabling allowNonRestoredState does not solve this problem, because Flink is not able to map the state saved in the snapshot with the actual operators, after the changes.

In our migration scenario, the only option is resetting the state of your application. You can achieve this in Amazon Managed Service for Apache Flink by selecting Skip restore from snapshot (SKIP_RESTORE_FROM_SNAPSHOT) when you deploy the change that replaces the source connector.

After the application using the new source is up and running, you can switch back to the default behavior of when restarting the application, using the latest snapshots (RESTORE_FROM_LATEST_SNAPSHOT). This way, no data loss happens when the application is restarted.

Choosing the right connector package and version

The dependency version you need to pick is normally <connector-version>-<flink-version>. For example, the latest Kinesis connector version is 5.0.0. If you are using a Flink runtime version 1.20.x, your dependency for the DataStream API is 5.0.0-1.20.

For the most up-to-date connector versions, see Use Apache Flink connectors with Managed Service for Apache Flink.

Connector artifact

In previous versions of the connector (4.x and before), there were separate packages for the source and sink. This additional level of complexity has been removed with version 5.x.

For your Java application, or Python applications where you package JAR dependencies using Maven, as shown in the Amazon Managed Service for Apache Flink examples GitHub repository, the following dependency contains the new version of both source and sink connectors:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-aws-kinesis-streams</artifactId>
    <version>5.0.0-1.20</version>
</dependency>

Make sure you’re using the latest available version. At the time of writing, this is 5.0.0. You can verify the available artifact versions in Maven Central. Also, use the correct version depending on your Flink runtime version. The previous example is for Flink 1.20.0.

Connector artifacts for Python application

If you use Python, we recommend packaging JAR dependencies using Maven, as shown in the Amazon Managed Service for Apache Flink examples GitHub repository. However, if you’re passing directly a single JAR to your Amazon Managed Service for Apache Flink application, you need to use the artifact that includes all transitive dependencies. In the case of the new Kinesis source and sink, this is called flink-sql-connector-aws-kinesis-streams. This artifact includes only the new source. Refer to Amazon Kinesis Data Streams SQL Connector for the right package, in case you want to use both the new and the legacy source.

Conclusion

The new Flink Kinesis source connector introduces many new features that improve stability and performance, and prepares your application for Flink 2.x. Support for watermark idleness and alignment is a particularly important feature if your application uses event-time semantics. The ability to retain record ordering improves data consistency, in particular when stream resharding happens, and when you replay old data from a stream that has been reshared.

You should carefully plan the change if you’re migrating your application from the legacy Kinesis source connector, and make sure you follow Flink’s best practices like specifying a UID on all DataStream operators.

You can find a working example of Java DataStream API application using the new connector, in the Amazon Managed Service for Apache Flink samples GitHub repository.

To learn more about the new Flink Kinesis source connector, refer to Amazon Kinesis Data Streams Connector and Amazon Kinesis Data Streams SQL Connector.


About the Author

Lorenzo NicoraLorenzo Nicora works as a Senior Streaming Solutions Architect at AWS, helping customers across EMEA. He has been building cloud-centered, data-intensive systems for over 25 years, working across industries both through consultancies and product companies. He has used open source technologies extensively and contributed to several projects, including Apache Flink.