import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.distributed.RowMatrix val mat: RowMatrix = ... // Compute the top 10 principal components. val pc: Matrix = mat.computePrincipalComponents(10) // Principal components are stored in a local dense matrix. // Project the rows to the linear space spanned by the top 10 principal components. val projected: RowMatrix = mat.multiply(pc)
实现代码
主成分分析的实现代码在RowMatrix中实现。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12
defcomputePrincipalComponents(k: Int): Matrix = { val n = numCols().toInt //计算协方差矩阵 valCov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]] //特征值分解 val brzSvd.SVD(u: BDM[Double], _, _) = brzSvd(Cov) if (k == n) { Matrices.dense(n, k, u.data) } else { Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)) } }
defcomputeCovariance(): Matrix = { val n = numCols().toInt checkNumColumns(n) val (m, mean) = rows.treeAggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))( seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += v.toBreeze), combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => (s1._1 + s2._1, s1._2 += s2._2) ) updateNumRows(m) mean :/= m.toDouble // We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is // large but Cov(X, Y) is small, but it is good for sparse computation. // TODO: find a fast and stable way for sparse data. valG = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] var i = 0 var j = 0 val m1 = m - 1.0 var alpha = 0.0 while (i < n) { alpha = m / m1 * mean(i) j = i while (j < n) { valGij = G(i, j) / m1 - alpha * mean(j) G(i, j) = Gij G(j, i) = Gij j += 1 } i += 1 } Matrices.fromBreeze(G) }