import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.SingularValueDecomposition val mat: RowMatrix = ... // Compute the top 20 singular values and corresponding singular vectors. val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(20, computeU = true) valU: RowMatrix = svd.U// The U factor is a RowMatrix. val s: Vector = svd.s // The singular values are stored in a local dense vector. valV: Matrix = svd.V// The V factor is a local dense matrix.
val computeMode = mode match { case"auto" => if (k > 5000) { logWarning(s"computing svd with k=$k and n=$n, please check necessity") } if (n < 100 || (k > n / 2 && n <= 15000)) { // 满足上述条件,首先计算方阵,然后本地计算特征值,避免数据传递 if (k < n / 3) { SVDMode.LocalARPACK } else { SVDMode.LocalLAPACK } } else { // 分布式实现 SVDMode.DistARPACK } case"local-svd" => SVDMode.LocalLAPACK case"local-eigs" => SVDMode.LocalARPACK case"dist-eigs" => SVDMode.DistARPACK }
2 特征值分解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
val (sigmaSquares: BDV[Double], u: BDM[Double]) = computeMode match { caseSVDMode.LocalARPACK => valG = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] EigenValueDecomposition.symmetricEigs(v => G * v, n, k, tol, maxIter) caseSVDMode.LocalLAPACK => // breeze (v0.10) svd latent constraint, 7 * n * n + 4 * n < Int.MaxValue valG = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] val brzSvd.SVD(uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G) (sigmaSquaresFull, uFull) caseSVDMode.DistARPACK => if (rows.getStorageLevel == StorageLevel.NONE) { logWarning("The input data is not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter) }
//获取特征值向量 val sigmas: BDV[Double] = brzSqrt(sigmaSquares) val sigma0 = sigmas(0) val threshold = rCond * sigma0 var i = 0 // sigmas的长度可能会小于k // 所以使用 i < min(k, sigmas.length) 代替 i < k. if (sigmas.length < k) { logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.") } while (i < math.min(k, sigmas.length) && sigmas(i) >= threshold) { i += 1 } val sk = i if (sk < k) { logWarning(s"Requested $k singular values but only found $sk nonzeros.") } //计算s,也即sigma val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk)) //计算V valV = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk)) //计算U // N = Vk * Sk^{-1} valN = newBDM[Double](n, sk, Arrays.copyOfRange(u.data, 0, n * sk)) var i = 0 var j = 0 while (j < sk) { i = 0 val sigma = sigmas(j) while (i < n) { //对角矩阵的逆即为倒数 N(i, j) /= sigma i += 1 } j += 1 } //U=A * N valU = this.multiply(Matrices.fromBreeze(N))