tensorflow 六種方法構(gòu)建讀入batch樣本(含序列特征處理),踩坑經(jīng)驗(yàn)值得收藏

tensorflow 六種方法構(gòu)建讀入batch樣本(含序列特征處理),踩坑經(jīng)驗(yàn)值得收藏
書接上文,對 圖機(jī)器學(xué)習(xí)算法 感興趣 的 同學(xué) 可以去 圖算法十篇 之 圖機(jī)器學(xué)習(xí)系列文章總結(jié) ?這里查看,對 推薦廣告算法 感興趣 的同學(xué) 可以去這里 系列小作文之企業(yè)級(jí)機(jī)器學(xué)習(xí)pipline總結(jié) 查看,干貨多多 哦! 而對 使用 tensorflow 實(shí)現(xiàn) 復(fù)雜機(jī)器學(xué)習(xí)/深度學(xué)習(xí) 模型 感興趣 的同學(xué), 歡迎關(guān)注 算法全棧之路 的 公眾號(hào) 接下來 逐步更新 的 模型手把手系列 的文章~
本篇是 模型手把手系列 的第二篇文章,本系列的上一篇文章 ?模型手把手系列開篇 之 python、spark 和 java 生成TFrecord 中 我們 主要說明 了 如何用 多種方式 生成 tensorflow 官方推薦 的 數(shù)據(jù)格式 tfrecord 的 方法,而 本章 我們則 將繼續(xù) 看看 tensorflow 如何 讀取 各種類型的特征,特別是序列特征數(shù)據(jù) ,并使用 多種方法生成 batch 訓(xùn)練樣本,代碼 涵蓋 tensorflow 1.x 系列 和 tensorflow 2.x 系列 的 方法,走過 路過 不能錯(cuò)過 哦 !
作者 花了 大量時(shí)間 來 整理 本文章里 介紹的 多種方法 的源碼,是因?yàn)?在 當(dāng)初寫 圖算法相關(guān)的文章之 圖上 deepwalk 算法理論與實(shí)戰(zhàn),圖算法之瑞士軍刀篇(一) 以及 圖上 deepwalk 算法理論 與 tensorflow keras 實(shí)戰(zhàn),圖算法之瑞士軍刀篇(二) 這 兩篇 文章的時(shí)候, 小小的 batch 數(shù)據(jù)生成,坑死了我這個(gè)混跡于國內(nèi)外互聯(lián)網(wǎng)大廠多年的算法老同志~ ,有些問題 沒遇到不算事,遇到了 找bub 真是要了我的小命了。 閑言少敘 ,就看文章干不干,轉(zhuǎn)需吧 ~
本文主要講解了 6 種 用tensorflow 1.x / 2.x 如何讀取 訓(xùn)練數(shù)據(jù),特別是 序列特征數(shù)據(jù) 的 處理方法。因?yàn)?這些方法 有著 各自的應(yīng)用場景 和 各自使用特點(diǎn),算是 對 上次遇坑 的報(bào)復(fù)性解決心理 吧,這里 我 全部 把開發(fā)列舉 出來了,希望 可以 切實(shí)的幫助到 同樣 遇到問題的老哥 。
老話說得好: 代碼是表達(dá)程序員思想的最好語言。本文的 數(shù)據(jù)讀入代碼 ,刻意 剖析了 使用 tensorflow 多種方法讀入 用戶歷史行為序列特征 的過程 ,代碼 每個(gè) 單元cell 均可以 獨(dú)立完美 運(yùn)行成功,具有極高的 參考價(jià)值 哦。詳細(xì)內(nèi)容 直接 看代碼 吧!~~
(1)代碼時(shí)光
本文共介紹了 6種 tensorflow 讀取數(shù)據(jù) ?并 batch 訓(xùn)練 的方法,包括使用 slice_input_producer、from_tensor_slices、generate、interleave
以及 自定義 生成batch 數(shù)據(jù) 等方法,下面就讓我們 一種一種方法 的 介紹吧,總有一種適合你的。
(1.0) 數(shù)據(jù)準(zhǔn)備
本文 用到的 數(shù)據(jù),從 內(nèi)存中 讀取csv的,我們在這里直接列出; 而使用到 tfrecord 的,我們則使用的上文 模型手把手系列開篇 之 python、spark 和 java 生成TFrecord ?中python方法單機(jī)版生成的tfrecord 數(shù)據(jù)。
@?歡迎關(guān)注作者公眾號(hào)?算法全棧之路
import?pandas?as?pd?
raw_df?=?pd.DataFrame([[28,12.1,'male',[1,2],1],?[30,8.7,?'female',[3,4,5],0],?[32,24.6,'female',[6,7,8,9,10],1]],?columns=['age',?'price','sex','click_list','label'])
#?序列特征長度不夠填充,使用?tf.train.batch?生成?batch?必須要定長序列?
max_len=5?
padding_value=0?
raw_df['click_list']?=?raw_df['click_list'].apply(lambda?x:?x?+?[padding_value]*(max_len-?len(x)))
raw_df['click_list_str']?=?raw_df['click_list'].apply(lambda?x:?'#'.join(map(str,?x)))
#?普通特征處理
raw_df['age']?=?raw_df['age'].astype(str)
raw_df['sex']?=?raw_df['sex'].astype(str)
raw_df['label']?=?raw_df['label'].astype(str)
print(raw_df)
raw_df.to_csv("read_sample.csv",sep='\t',index=False)
代碼 很 簡單,我就 不贅述 了。
中間要 注意的是:click_list 這一列特征就是 序列特征 ,每個(gè)用戶 的 歷史行為序列 的長度 并非定長 ,但是在某些方法里, 生成batch特征的時(shí)候,要求list 類型的數(shù)據(jù)是定長的 ,所以 我這里 用 默認(rèn)值 0 進(jìn)行了 padding 填充 。
(1.1)tensorflow 1.x 使用 slice_input_producer 生成 batch 數(shù)據(jù)
看代碼吧。
@?歡迎關(guān)注作者公眾號(hào)?算法全棧之路
import?tensorflow.compat.v1?as?tf
tf.compat.v1.disable_eager_execution()
#?創(chuàng)建輸入數(shù)據(jù)隊(duì)列
input_queue?=?tf.train.slice_input_producer(
????[raw_df['age'].to_list(),?raw_df['price'].to_list(),raw_df['sex'].to_list(),?raw_df['click_list'].to_list(),raw_df['label'].to_list()],
????shuffle=True
)
#?讀取隊(duì)列中的數(shù)據(jù)
all_sample_count?=?len(raw_df)
batch_size?=?2
num_threads?=?1
min_after_dequeue?=?1
all_feature_batch?=?tf.train.batch(
????input_queue,
????batch_size=batch_size,
????num_threads=num_threads,
????capacity=min_after_dequeue?+?(num_threads?+?1)?*?batch_size
)
#?打印輸出結(jié)果
with?tf.Session()?as?sess:
????#?初始化變量
????sess.run(tf.global_variables_initializer())
????#?啟動(dòng)隊(duì)列操作
????coord?=?tf.train.Coordinator()
????threads?=?tf.train.start_queue_runners(coord=coord)
????for?i?in?range(all_sample_count//batch_size):
????????age_batch,?price_batch,?sex_batch,click_list_batch,label_batch?=?sess.run(all_feature_batch)
????????print(f"age_batch:?{age_batch}\n?price_batch:?{price_batch}\n?sex_batch:?{sex_batch}?n?click_list_batch:?{click_list_batch}?n?label_batch:?{label_batch}?")
????coord.request_stop()
????coord.join(threads)
這里這里的 tf 是 import tensorflow.compat.v1 as tf
,適配于 tensorflow 1.x 系列 的模型。
這里主要用了 tf.train.slice_input_producer 和 tf.train.batch
數(shù)據(jù)來 生成batch 數(shù)據(jù)。
還是 重點(diǎn)說下序列特征 列 click_list_batch ,這里 讀入的 是一個(gè) 歷史點(diǎn)擊行為序列ID ?list,是 定長的 int 型 。定長 那就好辦了呀 ,直接 接embeding matrix 拿到每個(gè) id 對應(yīng) 的 embeding 然后扔進(jìn)模型里去。
這個(gè) cell 里 的 代碼是可以 跑通的,如果確實(shí)幫助到你了,歡迎 關(guān)注作者的公眾號(hào) 湊個(gè)份子~
(1.2)tensorflow 2.0 直接使用 from_tensor_slices 生成 batch 數(shù)據(jù)
@?歡迎關(guān)注作者公眾號(hào)?算法全棧之路
import?tensorflow?as?tf
tf.config.run_functions_eagerly(True)
print("eager_status:",tf.executing_eagerly())
import?pandas?as?pd
batch_size?=?3
max_len=5
raw_df['click_list']?=?raw_df['click_list'].apply(lambda?x:?'#'.join(map(str,?x)))
raw_df['age']?=?raw_df['age'].astype(str)
raw_df['price']?=?raw_df['price'].astype(str)
dataset?=?tf.data.Dataset.from_tensor_slices((raw_df[['age',?'price',?'sex',?'click_list']].values,?raw_df['label'].values))
dataset?=?dataset.shuffle(buffer_size=len(raw_df)).batch(batch_size)
#?Iterate?over?the?batches
for?batch?in?dataset:
????features,?labels?=?batch
????#?定位到?序列特征所在位置?
????str_list_batch?=?features[:,3:4]
????list_feature=tf.strings.split(str_list_batch,"#")
????#?輸出是一個(gè)SparseTensorValue對象
????#?https://blog.csdn.net/ustbbsy/article/details/116644136
????print("ccccc:",list_feature.values)
????print(list_feature.shape)
????
????print('Features:',?features)
????#?另一種定位序列特征的方式?
????print('Features(1):',?features[1][3])
????print('Labels:',?labels)
????print()
注意: 因?yàn)?我本機(jī)mac 的 tensorflow 版本是 2.6.0 的版本,所以這里tf默認(rèn)就是2.6.0了。
我們 可以使用
tf.config.run_functions_eagerly(True)
print("eager_status:",tf.executing_eagerly())?
來確認(rèn) 是否啟動(dòng)了 tensorflow 2.x系列的 eager 模式 。
這里還是 重點(diǎn)說一些 序列特征吧,這里讀入的是 把序列特征拼接成一個(gè)字符串 ,然后在對 每個(gè)batch里 進(jìn)行 字符串的分割,我們 這里用到的 方法 是 :
?str_list_batch?=?features[:,3:4]
?list_feature=tf.strings.split(str_list_batch,"#")
注意 tf.strings.split
的 返回是一個(gè) SparseTensorValue
對象, .values
屬性可以拿到具體的值。
因?yàn)槭?把序列特征拼接成了字符串,所以 我們這里 不要求序列長度是定長 的,非定長的序列特征處理 得到 SparseTensorValue
之后,我們可以使用 tf.Variable 或 tf.keras.layers.Embedding
來創(chuàng)建該嵌入矩陣。
最后,我們可以使用 tf.nn.embedding_lookup_sparse()
函數(shù) 來獲取 嵌入向量。
最后在強(qiáng)調(diào)一點(diǎn) 就是:對于支持 eager模式的 dataset, 我們可以直接用for循環(huán)以及dict 來獲取對應(yīng)特征的取值 哦,非常方便,非常強(qiáng)大 ,使用前 注意確認(rèn) eager模式 是否開啟。
(1.3)使用 dataset 的 generate 生成 batch 數(shù)據(jù)
對于 數(shù)據(jù)量不太大 的訓(xùn)練數(shù)據(jù),很多同學(xué) 還是 習(xí)慣 使用 python 的 yeild 來 構(gòu)建generator , 所以 我們 也提供 了 基于 generator 來 生成 batch 樣本 的方法,看代碼吧 ~
@?歡迎關(guān)注作者公眾號(hào)?算法全棧之路
import?tensorflow?as?tf
import?pandas?as?pd
import?numpy?as?np
#?創(chuàng)建一個(gè)虛擬的?pandas?dataframe
df?=?pd.DataFrame({
????'float_col':?np.random.rand(3),
????'int_col':?np.random.randint(0,?10,?size=(3)),
????'str_col':?['string{}'.format(i)?for?i?in?range(3)],
????'list_col':?[[i,?i+1]?for?i?in?range(3)]
})
print(df)
#?創(chuàng)建一個(gè)生成器函數(shù),用于將?pandas?dataframe?轉(zhuǎn)換為?Tensorflow?數(shù)據(jù)集
def?generator():
????for?index,?row?in?df.iterrows():
????????yield?(
????????????{
????????????????'float_input':?row['float_col'],
????????????????'int_input':?row['int_col'],
????????????????'str_input':?row['str_col'],
????????????????'list_input':?row['list_col']
????????????},
????????????row['int_col']??#?將?int_col?作為標(biāo)簽
????????)
#?創(chuàng)建?Tensorflow?數(shù)據(jù)集
dataset?=?tf.data.Dataset.from_generator(generator,?
?????????????????????????????????????????output_signature=(
?????????????????????????????????????????????{
?????????????????????????????????????????????????'float_input':?tf.TensorSpec(shape=(),?dtype=tf.float32),
?????????????????????????????????????????????????'int_input':?tf.TensorSpec(shape=(),?dtype=tf.int32),
?????????????????????????????????????????????????'str_input':?tf.TensorSpec(shape=(),?dtype=tf.string),
?????????????????????????????????????????????????'list_input':?tf.TensorSpec(shape=(2,),?dtype=tf.int32)
?????????????????????????????????????????????},
?????????????????????????????????????????????tf.TensorSpec(shape=(),?dtype=tf.int32)
?????????????????????????????????????????))
#?對數(shù)據(jù)進(jìn)行批次處理
batch_size?=?8
dataset?=?dataset.batch(batch_size)
#?打印數(shù)據(jù)集中的第一個(gè)批次
for?feature_batch,?label_batch?in?dataset:
????print('float_input:',?feature_batch['float_input'])
????print('int_input:',?feature_batch['int_input'])
????print('str_input:',?feature_batch['str_input'])
????print('list_input:',?feature_batch['list_input'])
????print('label:',?label_batch)
這里的 重點(diǎn) 依然是 序列特征的處理 ,對于 定長 以及 非定長 的 序列特征,本文前面均 進(jìn)行了 說明,這里 我就不在 強(qiáng)調(diào) 了,往上翻去 找找 就可以 看到哦。
(1.4)使用dataset 的 interleave 接口去讀取 txt 樣本文本文件
接下來 要介紹 的 兩種方法,才是 我們在 工業(yè)上 大數(shù)據(jù)場景下 實(shí)際使用的 非常多的 特征數(shù)據(jù) 讀入方法,看代碼吧~
@?歡迎關(guān)注作者公眾號(hào)?算法全棧之路
import?tensorflow?as?tf
print("eager_status:",tf.executing_eagerly())
tf.config.run_functions_eagerly(True)
#?訓(xùn)練集所有的列
TRAIN_SET_ALL_COLUMNS=["age",?"price",?"sex",?"click_list",?"label",?"click_list_str"]
#?沒有用到的列,這里把去掉?
TRAIN_SET_USELESS_COLUMN_NAMES=['click_list']
#?并行度
NUM_PARALLEL_FOR_DATASET=1
BATCH_SIZE=2
def?parse_txt_line(line,?label_dtype):
????if?label_dtype?==?tf.dtypes.float32:
????????label_default_value?=?0.0
????else:
????????label_default_value?=?0
????#?int64類型的默認(rèn)值,用long(0)也不好使,要設(shè)置一個(gè)真正大于int32的數(shù)值
????#?默認(rèn)值個(gè)數(shù)必須和讀入個(gè)數(shù)一致,很重要?ValueError:?not?enough?values?to?unpack?(expected?12,?got?4)
????#?整數(shù)默認(rèn)是?[1?<<?32]
????#?默認(rèn)值很重要,格式不對會(huì)導(dǎo)致這個(gè)問題
????#?ValueError:?Column?dtype?and?SparseTensors?dtype?must?be?compatible.?key:?adid,?column?dtype:
????#?<dtype:?'string'>,?tensor?dtype:?<dtype:?'int64'>
????field_defaults?=?[?[""],?[""],?[""],?[""],[label_default_value],[""]]
????#?從csv格式中解析出這些字段
????age,?price,?sex,?click_list,?label,?click_list_str?=?tf.io.decode_csv(line,?field_defaults,?field_delim="\t")
????#?對一些字段使用?tf.cast?進(jìn)行類型轉(zhuǎn)換,這里完全不需要,下游有進(jìn)行hash
????#?adid?=?tf.cast(adid,?tf.dtypes.int32)
????#?|?號(hào)分隔,??tf.strings.to_number?把字符串轉(zhuǎn)化為默認(rèn)浮點(diǎn)數(shù)
????#?user_click_seq?=?tf.strings.to_number(tf.strings.split(user_click_seq,?sep="|"))
????label?=?tf.cast(label,?tf.int64)
????fields_values?=?[age,?price,?sex,?click_list,?label,?click_list_str]
????features?=?dict(zip(TRAIN_SET_ALL_COLUMNS,?fields_values))
????#?沒有用到de列,需要pop出去
????for?useless_column_name?in?TRAIN_SET_USELESS_COLUMN_NAMES:
????????features.pop(useless_column_name)
????label?=?features.pop("label")
????#?返回一個(gè)dict{feature_name,value}?和?label
????return?features,?label
def?get_text_dataset(data_set_path_list,?label_dtype):
????filenames_dataset?=?tf.data.Dataset.from_tensor_slices(data_set_path_list)
????raw_dataset?=?filenames_dataset.interleave(
????????#?2個(gè)線程并行去讀??TextLineDataset
????????lambda?x:?tf.data.TextLineDataset(x,?num_parallel_reads=NUM_PARALLEL_FOR_DATASET),
????????#?NUM_PARALLEL_FOR_DATASET=2
????????cycle_length=NUM_PARALLEL_FOR_DATASET,
????????block_length=BATCH_SIZE,
????????num_parallel_calls=NUM_PARALLEL_FOR_DATASET
????)
????raw_dataset?=?raw_dataset.?\
????????map(lambda?line:?parse_txt_line(line,?label_dtype),?num_parallel_calls=NUM_PARALLEL_FOR_DATASET).?\
????????apply(tf.data.experimental.ignore_errors())
????#?格式?dict(fea_name,value)?,?label
????return?raw_dataset
train_set_path_list=["read_sample.csv"]
train_raw_dataset?=?get_text_dataset(train_set_path_list,?label_dtype=tf.dtypes.int64)
for?feature_batch,?label_batch?in?train_raw_dataset:
????????print(feature_batch['age'])
????????print(label_batch)
這里的 代碼 是 工業(yè)大數(shù)據(jù)場景下 常用的方法,我們使用 tf.data.Dataset.from_tensor_slices
接口,一般會(huì) 先使用 tf.io.gfile
相關(guān)的接口 讀取到 hdfs 大數(shù)據(jù)集群上的 文件路徑 ,然后 tf.data.TextLineDataset
去 并行讀取,這里 的 方法 主要調(diào)用了 ?parse_txt_line
這個(gè)方法 來解析單行 的 樣本文件。
這里的 序列特征,我們可以在 parse_txt_line
用 python方法 把處理成 list 數(shù)據(jù),但是 要求定長,具體方法 看本文 開始的時(shí)候 的處理方法。當(dāng)然,也可以在 獲得 batch 得時(shí)候 用 ?tf.strings.split
進(jìn)行處理,和上面開篇 第二種 方法一樣。
更近一步,甚至 我們可以 將 序列特征字符串 一直 放到 模型里 去處理 都是可以的。
(1.5) 使用dataset 的 interleave 接口去讀取 tfrecord 文件
這個(gè)方法是 企業(yè)級(jí)機(jī)器學(xué)習(xí)pipline 處理大數(shù)據(jù)量下 模型訓(xùn)練 用到最多 的方法,甚至 tfrecord 能夠 兼容語音圖像 等格式,這一塊 感興趣 的 同學(xué)自己下去 查看資料 吧,我們 這里 主要介紹的 都是 數(shù)值以及字符串列表 等 搜廣推算法 更多用到 的 特征數(shù)據(jù)。
@?歡迎關(guān)注作者公眾號(hào)?算法全棧之路
import?tensorflow?as?tf
print("eager_status:",tf.executing_eagerly())
tf.config.run_functions_eagerly(True)
#?并行度
NUM_PARALLEL_FOR_DATASET=1
BATCH_SIZE=2
def?get_tf_record_dataset(data_set_path_list,shuffle=True):
????files?=?tf.data.Dataset.list_files(data_set_path_list,?shuffle=shuffle)
????dataset?=?files.apply(
????????tf.data.experimental.parallel_interleave(
????????????lambda?x:?tf.data.TFRecordDataset(x,?num_parallel_reads=NUM_PARALLEL_FOR_DATASET),
????????????cycle_length=NUM_PARALLEL_FOR_DATASET,
????????????block_length=BATCH_SIZE,
????????????sloppy=False
????????)
????)
????
????#?parsing_spec?是一個(gè)字典,?它提供了每個(gè)特征到?"FixedLenFeature"?或?"VarLenFeature"?的映射
????parsing_spec?=?{
????????'age':?tf.io.FixedLenFeature([1],?tf.int64),
????????'price':?tf.io.FixedLenFeature([1],tf.float32),
????????'gender':?tf.io.FixedLenFeature([1],?tf.string),
????????'click_list':?tf.io.VarLenFeature(tf.int64),
????????'label':?tf.io.FixedLenFeature([1],tf.int64)
????}
????
????def?read_batch(serialized):
????????
????????feature?=?tf.io.parse_example(serialized,?features=parsing_spec)
????????label?=?feature['label']
????????return?feature,?{"label":?label}
????raw_tfrecord_data?=?dataset.map(read_batch,?NUM_PARALLEL_FOR_DATASET)
????#?格式?dict(fea_name,value),?label
????return?raw_tfrecord_data
train_set_path_list=["py_tf_record"]
train_raw_dataset?=?get_tf_record_dataset(train_set_path_list)
for?feature_batch,?label_batch?in?train_raw_dataset:
????????print("age:",feature_batch['age'])
????????#?這里的?click_list?返回的是一個(gè)?SparseTensor,?用?.values?方法可以得到值。
????????print("click_list:",feature_batch['click_list'].values)
????????print('label:',label_batch)
特別推薦 這里介紹 的 處理數(shù)據(jù) 的方法,將訓(xùn)練數(shù)據(jù) 保存為 tfrecord 格式,不僅 速度快 而且 節(jié)省存儲(chǔ) 空間,對 生成 tfrecord 數(shù)據(jù) 不熟悉的 同學(xué),可以 去看 作者的 上一篇 文章 模型手把手系列開篇 之 python、spark 和 java 生成TFrecord 。
這里 重點(diǎn)要強(qiáng)調(diào) 的是 ?parsing_spec
和 read_batch
方法,parsing_spec 中 定義來 定長和變長 tfrecord 數(shù)據(jù) 的 解析方法,非常優(yōu)秀,讀出來得 序列特征 是 變長的 SparseTensor
, 后面 處理得到 embeding
的方法,可與參考 上面文章 介紹的 SparseTensor 得到 embeding 得 部分內(nèi)容 哦,這里我 就也 不再贅述 了。
本文 到這里 ,我們 共介紹了 5 種 tensorflow 讀取數(shù)據(jù) 的方法,后兩種 為 工業(yè)大數(shù)據(jù)模型 訓(xùn)練場景下 的 算法利器,強(qiáng)烈推薦。
加上 圖上 deepwalk 算法理論與實(shí)戰(zhàn),圖算法之瑞士軍刀篇(一) ??文章里使用的 自定義生成 batch 數(shù)據(jù) 的方法,共有 ?6種方法 來 適配不同的 業(yè)務(wù)數(shù)據(jù) 讀取場景了,可以 算是 集 tensorflow 讀取數(shù)據(jù) 的 大成之作了,每一個(gè) 小節(jié) 的 代碼 均可以 獨(dú)立運(yùn)行成功,非常 值得收藏!
到這里, 模型手把手系列開篇 之 ?tensorflow 六種方法讀入batch樣本(含序列特征處理), 踩坑經(jīng)驗(yàn)值得收藏 ?的 全文 就 寫完 了。本文代碼 每個(gè)模塊 均可以 獨(dú)立跑 成功,總有一款 適合你,希望 可以對你 有 參考作用 ~
碼字不易,覺得有收獲就動(dòng)動(dòng)小手轉(zhuǎn)載一下吧,你的支持是我寫下去的最大動(dòng)力 ~
更多更全更新內(nèi)容,歡迎關(guān)注作者的公眾號(hào): 算法全棧之路

- END -