Spark的LSH示例代碼解析(java)
因為自己的項目需要,所以近期用到了Spark的這個LSH局部哈希算法工具,因為示例代碼沒啥解釋,所以花費了一些時間看官方文檔來理解并增加大量注釋,希望能給后人帶來一些幫助。
示例代碼地址:
https://spark.apache.org/docs/3.1.1/ml-features.html#lsh-algorithms

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
public class SimilarLSHUtil {
? ? // Spark在springboot中應(yīng)用的探索
? ? public void testFunction() {
? ? ? ? // SparkSession 是 Spark SQL 的入口,使用Dataset和DataFrame API編程Spark的入口點。
? ? ? ? // 使用 Dataset 或者 Datafram 編寫 Spark SQL 應(yīng)用的時候,第一個要創(chuàng)建的對象就是 SparkSession。
? ? ? ? SparkSession spark = SparkSession.builder()
? ? ? ? ? ? ? ? .master("local[*]")? // 設(shè)置要連接的Spark主URL,例如“ local”在本地運行,“ local [4]”在4核本地運行,或“ spark:// master:7077”在Spark獨立集群上運行。
? ? ? ? ? ? ? ? .appName("Spark")? ?// 設(shè)置應(yīng)用程序的名稱,該名稱將顯示在Spark Web UI中。
? ? ? ? ? ? ? ? .getOrCreate();? ? ?// 獲取一個現(xiàn)有的,SparkSession或者如果不存在,則根據(jù)此構(gòu)建器中設(shè)置的選項創(chuàng)建一個新的。
? ? ? ? List<Row> dataA = Arrays.asList(
? ? ? ? ? ? ? ? RowFactory.create(0, Vectors.dense(1.0, 1.0)), // Vectors.dense構(gòu)造密集向量,Vectors.sparse構(gòu)造稀疏向量
? ? ? ? ? ? ? ? RowFactory.create(1, Vectors.dense(1.0, -1.0)), // RowFactory.create依據(jù)給予的參數(shù)創(chuàng)建一個row
? ? ? ? ? ? ? ? RowFactory.create(2, Vectors.dense(-1.0, -1.0)), // RowFactory是一個構(gòu)造Row對象的工廠類
? ? ? ? ? ? ? ? RowFactory.create(3, Vectors.dense(-1.0, 1.0))
? ? ? ? );
? ? ? ? List<Row> dataB = Arrays.asList(
? ? ? ? ? ? ? ? RowFactory.create(4, Vectors.dense(1.0, 0.0)),
? ? ? ? ? ? ? ? RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
? ? ? ? ? ? ? ? RowFactory.create(6, Vectors.dense(0.0, 1.0)),
? ? ? ? ? ? ? ? RowFactory.create(7, Vectors.dense(0.0, -1.0))
? ? ? ? );
? ? ? ? // 一個StructType對象,可以有多個StructField,同時也可以用名字(name)來提取,就想當(dāng)于Map可以用key來提取value
? ? ? ? /*
? ? ? ? // Case Class(樣例類)是一種特殊的類,它們經(jīng)過優(yōu)化以被用于模式匹配。
? ? ? ? case class StructField(
? ? ? ? ? ? name: String,? ? ? ? ? ? ? ? ? ? ? ? // 此字段的名稱
? ? ? ? ? ? dataType: DataType,? ? ? ? ? ? ? ? ? // 此字段的數(shù)據(jù)類型
? ? ? ? ? ? nullable: Boolean = true,? ? ? ? ? ? // 指示此字段是否可以為null
? ? ? ? ? ? metadata: Metadata = Metadata.empty? // 元數(shù)據(jù),此字段的元數(shù)據(jù)
? ? ? ? ) {}
? ? ? ? */
? ? ? ? StructType schema = new StructType(new StructField[]{
? ? ? ? ? ? ? ? new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
? ? ? ? ? ? ? ? new StructField("features", new VectorUDT(), false, Metadata.empty())
? ? ? ? });
? ? ? ? // 數(shù)據(jù)集是特定于域的對象的強類型集合,可以使用功能或關(guān)系操作并行轉(zhuǎn)換它們。每個數(shù)據(jù)集還具有一個被稱為DataFrame的無類型視圖,即Row的一個數(shù)據(jù)集。
? ? ? ? Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
? ? ? ? Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
? ? ? ? BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
? ? ? ? ? ? ? ? .setBucketLength(2.0)? ? ?// 設(shè)置每個哈希存儲桶的長度,較大的存儲桶可降低誤報率。如果將輸入向量標(biāo)準(zhǔn)化,則pow(numRecords,-1 / inputDim)的1-10倍是合理的值
? ? ? ? ? ? ? ? .setNumHashTables(3)? ? ? // LSH OR擴增中使用的哈希表數(shù)量的參數(shù)。
? ? ? ? ? ? ? ? .setInputCol("features")? // 設(shè)置輸入列的名稱
? ? ? ? ? ? ? ? .setOutputCol("hashes");? // 設(shè)置輸出列的名稱
// LSH OR擴增中使用的哈希表數(shù)量的參數(shù).LSH OR放大可用于降低假陰性率。該參數(shù)的較高值導(dǎo)致降低的誤報率,但以增加的計算復(fù)雜性為代價。
// https://cloud.tencent.com/developer/article/1035600
// 使用numHashTables = 5,近似最近鄰的速度比完全掃描快2倍。在numHashTables = 3的情況下,近似相似連接比完全連接和過濾要快3-5倍。
? ? ? ? BucketedRandomProjectionLSHModel model = mh.fit(dfA); // fit(Dataset<?> dataset):使模型適合輸入數(shù)據(jù)
? ? ? ? // Feature Transformation 特征轉(zhuǎn)換
? ? ? ? // 散列值存儲在“hashes”列中的散列數(shù)據(jù)集
? ? ? ? System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
? ? ? ? // transform(Dataset<?> dataset): 轉(zhuǎn)換輸入數(shù)據(jù)集。
? ? ? ? model.transform(dfA).show();
? ? ? ? // 計算輸入行的位置敏感哈希,然后執(zhí)行近似相似加入
? ? ? ? // 我們可以通過傳入已轉(zhuǎn)換的數(shù)據(jù)集來避免計算哈希,例如:`model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
? ? ? ? System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
? ? ? ? model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance") // 連接兩個數(shù)據(jù)集以大致找到距離小于閾值的所有行對。threshold:閾值。返回:Dataset<?>
? ? ? ? ? ? ? ? .select(col("datasetA.id").alias("idA"), // select(Column... cols)選擇一組基于列的表達(dá)式。
? ? ? ? ? ? ? ? ? ? ? ? col("datasetB.id").alias("idB"), // col(String colName),根據(jù)列名稱選擇列,并將其作為返回Column。alias(String alias):為列指定別名。
? ? ? ? ? ? ? ? ? ? ? ? col("EuclideanDistance")).show(); // show()以表格形式顯示數(shù)據(jù)集的前20行。
? ? ? ? // 計算輸入行的位置敏感哈希,然后執(zhí)行近似最近相鄰搜索。
? ? ? ? // 我們可以通過傳入已轉(zhuǎn)換的數(shù)據(jù)集來避免計算哈希,例如 `model.approxNearestNeighbors(transformedA, key, 2)`
? ? ? ? System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
? ? ? ? Vector key = Vectors.dense(1.0, 0.0);
? ? ? ? // 大約在dfA中依據(jù)key搜索2個最近鄰居
? ? ? ? // approxNearestNeighbors(Dataset<?> dataset, Vector key, int numNearestNeighbors, String distCol) 給定一個大型數(shù)據(jù)集和一個項目,大約可以找到最多k個與該項目距離最近的項目。
? ? ? ? model.approxNearestNeighbors(dfA, key, 2).show();
? ? }
}

(以后有時間了,會再出一篇java下的Spark使用配置過程)
(以后有時間了,也會順便寫一個如何在java(特別是Springboot項目)代碼中應(yīng)用這個算法工具)

因為我不是搞算法的,只是用到了而已,所以可能部分地方存在認(rèn)知錯誤,求輕噴...