flink消费kafka的offset与checkpoint

flink消费kafka的offset与checkpoint

技术教程gslnedu2024-12-18 13:12:2612A+A-

生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis。使用的flink版本为1.11.1。

为了防止写入hive的文件数量过多,我设置了checkpoint为30分钟。

env.enableCheckpointing(1000 * 60 * 30); // 1000 * 60 * 30 => 30 minutes

达到的效果就是每30分钟生成一个文件,如下:

hive> dfs -ls /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/ ;
Found 10 items
-rw-r--r--   3 hdfs hive          0 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/_SUCCESS
-rw-r--r--   3 hdfs hive     248895 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10911
-rw-r--r--   3 hdfs hive     306900 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10912
-rw-r--r--   3 hdfs hive     208227 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10913
-rw-r--r--   3 hdfs hive     263586 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10911
-rw-r--r--   3 hdfs hive     307723 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10912
-rw-r--r--   3 hdfs hive     196777 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10913
-rw-r--r--   3 hdfs hive     266984 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10911
-rw-r--r--   3 hdfs hive     338992 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10912
-rw-r--r--   3 hdfs hive     216655 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10913
hive> 

但是,同时也观察到归属于这个作业的kafka消费组积压数量,每分钟消费数量,明显具有周期性消费峰值。

比如,对于每30分钟时间间隔度的一个观察,前面25分钟的“每分钟消费数量”都是为0,然后,后面5分钟的“每分钟消费数量”为300k。同理,“消费组积压数量”也出现同样情况,积压数量一直递增,但是到了30分钟的间隔,就下降到数值0。如图。

消费组每分钟消费数量

消费组积压数量

但其实,通过对hbase,hive,redis的观察,数据是实时写入的,并不存在前面25分钟没有消费数据的情况。

查阅资料得知,flink会自己维护一份kafka的offset,然后checkpoint时间点到了,再把offset更新回kafka。

为了验证这个观点,“flink在checkpoint的时候,才把消费kafka的offset更新回kafka”,同时,观察,savepoint机制是否会重复消费kafka,我尝试写一个程序,逻辑很简单,就是从topic "test"读取数据,然后写入topic "test2"。特别说明,这个作业的checkpoint是1分钟。

package com.econ.powercloud.jobsTest;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import javax.annotation.Nullable;
import java.util.Properties;

public class TestKafkaOffsetCheckpointJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(1000 * 60);

        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        String bootstrapServers = parameterTool.get("bootstrap.servers") == null ? "localhost:9092" : parameterTool.get("bootstrap.servers");

        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", bootstrapServers);
        properties.setProperty("group.id", "prod-econ-flink-TestKafkaOffsetCheckpointJob-local");
        properties.setProperty("transaction.timeout.ms", String.valueOf(1000 * 60 * 5));

        String topic = "test";
        FlinkKafkaConsumer<String> stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);

        DataStreamSource<String> stringDataStreamSource = env.addSource(stringFlinkKafkaConsumer);

        String producerTopic = "test2";
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(producerTopic, new KafkaSerializationSchema<String>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
                return new ProducerRecord<>(producerTopic, element.getBytes());
            }
        }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        stringDataStreamSource.addSink(kafkaProducer);

        env.execute("TestKafkaOffsetCheckpointJob");
    }
}

提交作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092
Job has been submitted with JobID 5fdd14f7fd3c93287635c9d61180d8a6
[econ@dev-hadoop-node-c ~]$ 

使用"kafka-console-producer.sh"往topic "test"生成消息"a1":

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:
>a1
>

使用"kafka-console-consumer.sh"消费topic "test2"的消息:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092
a1

证明作业逻辑本身没有问题,实现' 从topic "test"读取数据,然后写入topic "test2" '。

使用"kafka-consumer-groups.sh"观察消费组"prod-econ-flink-TestKafkaOffsetCheckpointJob-local"的积压数量,重点观察指标"LAG",可以看到LAG为1 :

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; 
Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            1          3               3               0               -               -               -
test            0          3               3               0               -               -               -
test            2          5               6               1               -               -               -
2020年10月18日 星期日 20时09分45秒 CST
RdeMacBook-Pro:kafka r$ 

证明flink消费了kafka数据后,不会更新offset到kafka。

停止作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 5fdd14f7fd3c93287635c9d61180d8a6
Suspending job "5fdd14f7fd3c93287635c9d61180d8a6" with a savepoint.
Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-5fdd14-53dfd9f8eccd
[econ@dev-hadoop-node-c ~]$ 

再次启动作业,但是,不使用上面生成的savepoint:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092
Job has been submitted with JobID 130568a2eeec96296237ed3e1f280f83
[econ@dev-hadoop-node-c ~]$ 

观察topic "test2",发现,同样的数据"a1"被生产进入:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092
a1
a1

证明:flink在没有使用savepoint的时候,消费kafka的offset还是从kafka自身获取。

再仔细观察topic "test"的“消费组积压数量”,注意在"20时10分05秒"还观察到积压数值1,但是在"20时10分08秒"就发现积压数值都是0.

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; 
Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            1          3               3               0               -               -               -
test            0          3               3               0               -               -               -
test            2          5               6               1               -               -               -
2020年10月18日 星期日 20时10分05秒 CST
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; 
Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            1          3               3               0               -               -               -
test            0          3               3               0               -               -               -
test            2          6               6               0               -               -               -
2020年10月18日 星期日 20时10分08秒 CST
RdeMacBook-Pro:kafka r$ 

这是因为,在"20:10:06"完成了一次checkpoint,把offset更新回kafka。

Flink Checkpoint History

下面接着测试flink使用savepoint的情况下,是否会重复消费kafka数据。

使用"kafka-console-producer.sh"往topic "test"生成消息"a2":

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092
>a1
>a2
>

使用"kafka-console-consumer.sh"消费topic "test2"的消息:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092
a1
a1
a2

停止作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 bb8b4ba7ddaad869c6469fab5e81d179
Suspending job "bb8b4ba7ddaad869c6469fab5e81d179" with a savepoint.
Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60
[econ@dev-hadoop-node-c ~]$ 

观察topic "test"的“消费组积压数量”,发现LAG还是1:

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; 
Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            1          3               4               1               -               -               -
test            0          3               3               0               -               -               -
test            2          6               6               0               -               -               -
2020年10月18日 星期日 20时28分39秒 CST
RdeMacBook-Pro:kafka r$ 

flink使用savepoint启动作业,注意参数"-s":

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d -s 'hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60' ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092
Job has been submitted with JobID d6cb6e1a6f9c0816ac4b61a1df38ddeb
[econ@dev-hadoop-node-c ~]$ 

观察"kafka-console-consumer.sh"消费topic "test2"的情况,没有新的消息被打印:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092
a1
a1
a2

再观察“消费组积压数量”,发现LAG值已经全部是0。

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; 
Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            1          4               4               0               -               -               -
test            0          3               3               0               -               -               -
test            2          6               6               0               -               -               -
2020年10月18日 星期日 20时31分43秒 CST
RdeMacBook-Pro:kafka r$ 

证明:flink使用savepoint启动作业,不会重复消费kafka数据,也会正确更新kafka的offset。

重申,以上试验证明:

  1. flink消费了kafka数据后,不会更新offset到kafka,直到checkpoint完成。
  2. flink在没有使用savepoint重启作业的时候,消费kafka的offset还是从kafka自身获取,存在重复消费数据的情况。
  3. flink使用savepoint重启作业,不会重复消费kafka数据,也会正确更新kafka的offset。

点击这里复制本文地址 以上内容由朽木教程网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

朽木教程网 © All Rights Reserved.  蜀ICP备2024111239号-8