如何將Kafka中的二進制數(shù)據(jù)轉換為Java或者Scala對象?
Flink Kafka Consumer 需要知道如何將 Kafka 中的二進制數(shù)據(jù)轉換為 Java 或者 Scala 對象。KafkaDeserializationSchema 允許用戶指定這樣的 schema,每條 Kafka 中的消息會調用 T deserialize(ConsumerRecord<byte[], byte[]> record) 反序列化。
為了方便使用,F(xiàn)link 提供了以下幾種 schemas:
SimpleStringSchema:按照字符串方式序列化、反序列化
TypeInformationSerializationSchema(和 TypeInformationKeyValueSerializationSchema) 基于 Flink 的 TypeInformation 創(chuàng)建 schema。如果該數(shù)據(jù)的讀和寫都發(fā)生在 Flink 中,那么這將是非常有用的。此 schema 是其他通用序列化方法的高性能 Flink 替代方案。
JsonDeserializationSchema(和 JSONKeyValueDeserializationSchema)將序列化的 JSON 轉化為 ObjectNode 對象,可以使用 objectNode.get("field").as(Int/String/...)() 來訪問某個字段。KeyValue objectNode 包含一個含所有字段的 key 和 values 字段,以及一個可選的"metadata"字段,可以訪問到消息的 offset、partition、topic 等信息。
AvroDeserializationSchema 使用靜態(tài)提供的 schema 讀取 Avro 格式的序列化數(shù)據(jù)。它能夠從 Avro 生成的類(AvroDeserializationSchema.forSpecific(...))中推斷出 schema,或者可以與 GenericRecords 一起使用手動提供的 schema(用 AvroDeserializationSchema.forGeneric(...))。此反序列化 schema 要求序列化記錄不能包含嵌入式架構!
要使用此反序列化 schema 必須添加以下依賴:
