如何實現(xiàn) Flink 讀寫數(shù)據(jù)到 Redis ?
API
該方法是將數(shù)據(jù)輸出到Redis數(shù)據(jù)庫中,Redis是一個基于內(nèi)存、性能極高的NoSQL數(shù)據(jù)庫,數(shù)據(jù)還可以持久化到磁盤,讀寫速度快,適合存儲key-value類型的數(shù)據(jù)。Redis不僅僅支持簡單的key-value類型的數(shù)據(jù),同時還提供list,set,zset,hash等數(shù)據(jù)結(jié)構(gòu)的存儲。Flink實時計算出的結(jié)果,需要快速的輸出存儲起來,要求寫入的存儲系統(tǒng)的速度要快,這個才不會造成數(shù)據(jù)積壓。Redis就是一個非常不錯的選擇。
https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
RedisSink 核心類是RedisMapper 是一個接口,使用時我們要編寫自己的redis 操作類實現(xiàn)這個接口中的三個方法,如下所示
1.getCommandDescription() :?
設(shè)置使用的redis 數(shù)據(jù)結(jié)構(gòu)類型,和key 的名稱,通過RedisCommand 設(shè)置數(shù)據(jù)結(jié)構(gòu)類型
2.String getKeyFromData(T data):
設(shè)置value 中的鍵值對key的值
3.String getValueFromData(T data);
設(shè)置value 中的鍵值對value的值
使用RedisCommand設(shè)置數(shù)據(jù)結(jié)構(gòu)類型時和redis結(jié)構(gòu)對應(yīng)關(guān)系

需求
從指定的socket讀取數(shù)據(jù),對單詞進行計算,將結(jié)果寫入到Redis中
代碼實現(xiàn)
