/* ----------------------------------------------------------------------- *//** * * @file sparse_linear_systems.cpp * * @brief sparse linear systems * * We implement sparse linear systems using the direct. * *//* ----------------------------------------------------------------------- */ #include #include #include #include #include "sparse_linear_systems.hpp" #include // Common SQL functions used by all modules //#include "sparse_linear_systems_states.hpp" namespace madlib { // Use Eigen using namespace dbal::eigen_integration; using namespace Eigen; namespace modules { // Import names from other MADlib modules using dbal::NoSolutionFoundException; namespace linear_systems{ // --------------------------------------------------------------------------- // Direct sparse Linear System States // --------------------------------------------------------------------------- // Function protofype of Internal functions AnyType direct_sparse_stateToResult( const Allocator &inAllocator, const ColumnVector &x, const double residual); /** * @brief States for linear systems * * TransitionState encapsualtes the transition state during the * logistic-regression aggregate function. To the database, the state is * exposed as a single DOUBLE PRECISION array, to the C++ code it is a proper * object containing scalars and vectors. * * Note: We assume that the DOUBLE PRECISION array is initialized by the * database with length at least 5, and all elemenets are 0. * */ template class SparseDirectLinearSystemTransitionState { template friend class SparseDirectLinearSystemTransitionState; public: SparseDirectLinearSystemTransitionState(const AnyType &inArray) : mStorage(inArray.getAs()) { rebind( static_cast(mStorage[1]), static_cast(mStorage[2])); } /** * @brief Convert to backend representation * * We define this function so that we can use State in the * argument list and as a return type. */ inline operator AnyType() const { return mStorage; } /** * @brief Initialize the conjugate-gradient state. * * This function is only called for the first iteration, for the first row. */ inline void initialize(const Allocator &inAllocator, uint32_t innumVars, uint32_t innumEquations, uint32_t inNNZA ) { // Array size does not depend in numVars mStorage = inAllocator.allocateArray(arraySize(innumEquations, inNNZA)); rebind(innumEquations, inNNZA); numVars = innumVars; numEquations = innumEquations; NNZA = inNNZA; } /** * @brief We need to support assigning the previous state */ template SparseDirectLinearSystemTransitionState &operator=( const SparseDirectLinearSystemTransitionState &inOtherState) { for (size_t i = 0; i < mStorage.size(); i++) mStorage[i] = inOtherState.mStorage[i]; return *this; } /** * @brief Merge with another State object by copying the intra-iteration * fields */ template SparseDirectLinearSystemTransitionState &operator+=( const SparseDirectLinearSystemTransitionState &inOtherState) { if (mStorage.size() != inOtherState.mStorage.size() || numVars != inOtherState.numVars || NNZA != inOtherState.NNZA|| numEquations != inOtherState.numEquations) throw std::logic_error("Internal error: Incompatible transition " "states"); b += inOtherState.b; b_stored += inOtherState.b_stored; // Merge state for sparse loading is not an array add operation // but it is an array-append operation for (uint32_t i=0; i < inOtherState.nnz_processed; i++){ r(nnz_processed + i) += inOtherState.r(i); c(nnz_processed + i) += inOtherState.c(i); v(nnz_processed + i) += inOtherState.v(i); } nnz_processed += inOtherState.nnz_processed; return *this; } /** * @brief Reset the inter-iteration fields. */ inline void reset() { nnz_processed = 0; r.fill(0); c.fill(0); v.fill(0); b.fill(0); b_stored.fill(0); } private: static inline size_t arraySize(const uint32_t innumEquations, const uint32_t inNNZA) { return 5 + 3 * inNNZA + 2 * innumEquations; } /** * @brief Rebind to a new storage array * * @param innumVars The number of independent variables. * * Array layout (iteration refers to one aggregate-function call): * Inter-iteration components (updated in final function): * - 0: numVars (Total number of variables) * - 1: numEquations (Total number of equations) * - 2: nnZ (Total number of non-zeros) * - 3: algorithm * - 4: nnz_processed Number of non-zeros processed by a node * - 4: b (RHS vector) * - 4 + 1 * numEquations: r (LHS matrix row) * - 4 + 1 * numEquations + 1 * nnzA: c (LHS matrix column) * - 4 + 1 * numEquations + 2 * nnzA: v (LHS matrix value) */ void rebind(uint32_t innumEquations, uint32_t inNNZA) { numVars.rebind(&mStorage[0]); numEquations.rebind(&mStorage[1]); NNZA.rebind(&mStorage[2]); algorithm.rebind(&mStorage[3]); nnz_processed.rebind(&mStorage[4]); b.rebind(&mStorage[5], innumEquations); b_stored.rebind(&mStorage[5 + innumEquations], innumEquations); r.rebind(&mStorage[5 + 2*innumEquations], inNNZA); c.rebind(&mStorage[5 + 2*innumEquations + inNNZA], inNNZA); v.rebind(&mStorage[5 + 2*innumEquations + 2 * inNNZA], inNNZA); } Handle mStorage; public: typename HandleTraits::ReferenceToUInt32 numVars; typename HandleTraits::ReferenceToUInt32 numEquations; typename HandleTraits::ReferenceToUInt32 NNZA; typename HandleTraits::ReferenceToUInt32 nnz_processed; typename HandleTraits::ReferenceToUInt32 algorithm; typename HandleTraits::ColumnVectorTransparentHandleMap b_stored; typename HandleTraits::ColumnVectorTransparentHandleMap b; typename HandleTraits::ColumnVectorTransparentHandleMap r; typename HandleTraits::ColumnVectorTransparentHandleMap c; typename HandleTraits::ColumnVectorTransparentHandleMap v; }; /** * @brief Perform the sparse linear system transition step */ AnyType sparse_direct_linear_system_transition::run(AnyType &args) { SparseDirectLinearSystemTransitionState > state = args[0]; int32_t row_id = args[1].getAs(); int32_t col_id = args[2].getAs(); double value = args[3].getAs(); double _b = args[4].getAs(); // When the segment recieves the first non-zero in the sparse matrix // we initialize the state if (state.nnz_processed == 0) { int32_t num_equations = args[5].getAs(); int32_t num_vars = args[6].getAs(); int32_t total_nnz = args[7].getAs(); int algorithm = args[8].getAs(); state.initialize(*this, num_vars, num_equations, total_nnz ); state.algorithm = algorithm; state.NNZA = total_nnz; state.numVars = num_vars; state.numEquations = num_equations; state.b_stored.setZero(); state.b.setZero(); } // Now do the transition step // First we create a block of zeros in memory // and then add the vector in the appropriate position ColumnVector bvec(static_cast(state.numEquations)); ColumnVector rvec(static_cast(state.NNZA)); ColumnVector cvec(static_cast(state.NNZA)); ColumnVector vvec(static_cast(state.NNZA)); bvec.setZero(); rvec.setZero(); cvec.setZero(); vvec.setZero(); rvec(state.nnz_processed) = row_id; cvec(state.nnz_processed) = col_id; vvec(state.nnz_processed) = value; if (state.b_stored(row_id) == 0) { bvec(row_id) = _b; state.b_stored(row_id) = 1; state.b += bvec; } // Build the vector & matrices based on row_id state.r += rvec; state.c += cvec; state.v += vvec; state.nnz_processed++; return state; } /** * @brief Perform the perliminary aggregation function: Merge transition states */ AnyType sparse_direct_linear_system_merge_states::run(AnyType &args) { SparseDirectLinearSystemTransitionState > stateLeft = args[0]; SparseDirectLinearSystemTransitionState > stateRight = args[1]; // We first handle the trivial case where this function is called with one // of the states being the initial state if (stateLeft.numEquations == 0) return stateRight; else if (stateRight.numEquations == 0) return stateLeft; // Merge states together and return stateLeft += stateRight; return stateLeft; } /** * @brief Perform the linear system final step */ AnyType sparse_direct_linear_system_final::run(AnyType &args) { // We request a mutable object. Depending on the backend, this might perform // a deep copy. SparseDirectLinearSystemTransitionState > state = args[0]; // Aggregates that haven't seen any data just return Null. if (state.numEquations == 0) return Null(); // Eigen works better when you reserve the number of nnz // Note: When calling reserve(), it is not required that nnz is the // exact number of nonzero elements in the final matrix. However, an // exact estimation will avoid multiple reallocations during the // insertion phase. SparseMatrix A(state.numEquations, state.numVars); A.reserve(state.NNZA); ColumnVector x; for (uint32_t i=0; i < state.NNZA; i++){ A.insert(static_cast(state.r(i)), static_cast(state.c(i))) = state.v(i); } // Switch case needs scoping in C++ if you want to declare inside it // Unfortunately, this means that I have to write the code to call the // solver in each switch case separtely switch (state.algorithm) { case 1:{ Eigen::SimplicialLLT solver; solver.compute(A); x = solver.solve(state.b); break; } case 2:{ Eigen::SimplicialLDLT solver; solver.compute(A); x = solver.solve(state.b); break; } } // It is unclear whether I should do the residual computation in-memory or // in-database (as done in the dense linear systems case) ColumnVector bvec = state.b; double residual = (A*x-bvec).norm() / bvec.norm(); // Compute the residual return direct_sparse_stateToResult(*this, x, residual); } /** * @brief Helper function that computes the final statistics */ AnyType direct_sparse_stateToResult( const Allocator &inAllocator, const ColumnVector &x, const double residual) { MutableNativeColumnVector solution( inAllocator.allocateArray(x.size())); for (Index i = 0; i < x.size(); ++i) { solution(i) = x(i); } // Return all coefficients, standard errors, etc. in a tuple AnyType tuple; tuple << solution << residual << Null(); return tuple; } // --------------------------------------------------------------------------- // In-memory iterative sparse Linear System States // --------------------------------------------------------------------------- // Function protofype of Internal functions AnyType inmem_iterative_sparse_stateToResult( const Allocator &inAllocator, const ColumnVector &x, const int iters, const double residual_norm); /** * @brief States for linear systems * * TransitionState encapsualtes the transition state during the * logistic-regression aggregate function. To the database, the state is * exposed as a single DOUBLE PRECISION array, to the C++ code it is a proper * object containing scalars and vectors. * * Note: We assume that the DOUBLE PRECISION array is initialized by the * database with length at least 5, and all elemenets are 0. * */ template class SparseInMemIterativeLinearSystemTransitionState { template friend class SparseInMemIterativeLinearSystemTransitionState; public: SparseInMemIterativeLinearSystemTransitionState(const AnyType &inArray) : mStorage(inArray.getAs()) { rebind( static_cast(mStorage[1]), static_cast(mStorage[2])); } /** * @brief Convert to backend representation * * We define this function so that we can use State in the * argument list and as a return type. */ inline operator AnyType() const { return mStorage; } /** * @brief Initialize the conjugate-gradient state. * * This function is only called for the first iteration, for the first row. */ inline void initialize(const Allocator &inAllocator, uint32_t innumVars, uint32_t innumEquations, uint32_t inNNZA ) { // Array size does not depend in numVars mStorage = inAllocator.allocateArray(arraySize(innumEquations, inNNZA)); rebind(innumEquations, inNNZA); numVars = innumVars; numEquations = innumEquations; NNZA = inNNZA; } /** * @brief We need to support assigning the previous state */ template SparseInMemIterativeLinearSystemTransitionState &operator=( const SparseInMemIterativeLinearSystemTransitionState &inOtherState) { for (size_t i = 0; i < mStorage.size(); i++) mStorage[i] = inOtherState.mStorage[i]; return *this; } /** * @brief Merge with another State object by copying the intra-iteration * fields */ template SparseInMemIterativeLinearSystemTransitionState &operator+=( const SparseInMemIterativeLinearSystemTransitionState &inOtherState) { if (mStorage.size() != inOtherState.mStorage.size() || numVars != inOtherState.numVars || NNZA != inOtherState.NNZA|| numEquations != inOtherState.numEquations) throw std::logic_error("Internal error: Incompatible transition " "states"); b += inOtherState.b; b_stored += inOtherState.b_stored; // Merge state for sparse loading is not an array add operation // but it is an array-append operation for (uint32_t i=0; i < inOtherState.nnz_processed; i++){ r(nnz_processed + i) += inOtherState.r(i); c(nnz_processed + i) += inOtherState.c(i); v(nnz_processed + i) += inOtherState.v(i); } nnz_processed += inOtherState.nnz_processed; return *this; } /** * @brief Reset the inter-iteration fields. */ inline void reset() { nnz_processed = 0; r.fill(0); c.fill(0); v.fill(0); b.fill(0); b_stored.fill(0); } private: static inline size_t arraySize(const uint32_t innumEquations, const uint32_t inNNZA) { return 7 + 3 * inNNZA + 2 * innumEquations; } /** * @brief Rebind to a new storage array * * @param innumVars The number of independent variables. * * Array layout (iteration refers to one aggregate-function call): * Inter-iteration components (updated in final function): * - 0: numVars (Total number of variables) * - 1: numEquations (Total number of equations) * - 2: nnZ (Total number of non-zeros) * - 3: algorithm * - 4: maxIter * - 5: termToler * - 6: nnz_processed Number of non-zeros processed by a node * - 7: b (RHS vector) * - 7: b_stored (Boolean: Has the b_vector already been loaded?) * - 7 + 2 * numEquations: r (LHS matrix row) * - 7 + 2 * numEquations + 1 * nnzA: c (LHS matrix column) * - 7 + 2 * numEquations + 2 * nnzA: v (LHS matrix value) */ void rebind(uint32_t innumEquations, uint32_t inNNZA) { numVars.rebind(&mStorage[0]); numEquations.rebind(&mStorage[1]); NNZA.rebind(&mStorage[2]); algorithm.rebind(&mStorage[3]); nnz_processed.rebind(&mStorage[4]); maxIter.rebind(&mStorage[5]); termToler.rebind(&mStorage[6]); b.rebind(&mStorage[7], innumEquations); b_stored.rebind(&mStorage[7 + innumEquations], innumEquations); r.rebind(&mStorage[7 + 2*innumEquations], inNNZA); c.rebind(&mStorage[7 + 2*innumEquations + inNNZA], inNNZA); v.rebind(&mStorage[7 + 2*innumEquations + 2 * inNNZA], inNNZA); } Handle mStorage; public: typename HandleTraits::ReferenceToUInt32 numVars; typename HandleTraits::ReferenceToUInt32 numEquations; typename HandleTraits::ReferenceToUInt32 NNZA; typename HandleTraits::ReferenceToUInt32 nnz_processed; typename HandleTraits::ReferenceToUInt32 algorithm; typename HandleTraits::ReferenceToUInt32 maxIter; typename HandleTraits::ReferenceToDouble termToler; typename HandleTraits::ColumnVectorTransparentHandleMap b_stored; typename HandleTraits::ColumnVectorTransparentHandleMap b; typename HandleTraits::ColumnVectorTransparentHandleMap r; typename HandleTraits::ColumnVectorTransparentHandleMap c; typename HandleTraits::ColumnVectorTransparentHandleMap v; }; /** * @brief Perform the sparse linear system transition step */ AnyType sparse_inmem_iterative_linear_system_transition::run(AnyType &args) { SparseInMemIterativeLinearSystemTransitionState > state = args[0]; int32_t row_id = args[1].getAs(); int32_t col_id = args[2].getAs(); double value = args[3].getAs(); double _b = args[4].getAs(); // When the segment recieves the first non-zero in the sparse matrix // we initialize the state if (state.nnz_processed == 0) { int32_t num_equations = args[5].getAs(); int32_t num_vars = args[6].getAs(); int32_t total_nnz = args[7].getAs(); int algorithm = args[8].getAs(); int32_t maxIter = args[9].getAs(); double termToler = args[10].getAs(); state.initialize(*this, num_vars, num_equations, total_nnz ); state.algorithm = algorithm; state.NNZA = total_nnz; state.numVars = num_vars; state.numEquations = num_equations; state.maxIter = maxIter; state.termToler = termToler; state.b_stored.setZero(); state.b.setZero(); } // Now do the transition step // First we create a block of zeros in memory // and then add the vector in the appropriate position ColumnVector bvec(static_cast(state.numEquations)); ColumnVector rvec(static_cast(state.NNZA)); ColumnVector cvec(static_cast(state.NNZA)); ColumnVector vvec(static_cast(state.NNZA)); bvec.setZero(); rvec.setZero(); cvec.setZero(); vvec.setZero(); rvec(state.nnz_processed) = row_id; cvec(state.nnz_processed) = col_id; vvec(state.nnz_processed) = value; if (state.b_stored(row_id) == 0) { bvec(row_id) = _b; state.b_stored(row_id) = 1; state.b += bvec; } // Build the vector & matrices based on row_id state.r += rvec; state.c += cvec; state.v += vvec; state.nnz_processed++; return state; } /** * @brief Perform the perliminary aggregation function: Merge transition states */ AnyType sparse_inmem_iterative_linear_system_merge_states::run(AnyType &args) { SparseInMemIterativeLinearSystemTransitionState > stateLeft = args[0]; SparseInMemIterativeLinearSystemTransitionState > stateRight = args[1]; // We first handle the trivial case where this function is called with one // of the states being the initial state if (stateLeft.numEquations == 0) return stateRight; else if (stateRight.numEquations == 0) return stateLeft; // Merge states together and return stateLeft += stateRight; return stateLeft; } /** * @brief Perform the linear system final step */ AnyType sparse_inmem_iterative_linear_system_final::run(AnyType &args) { // We request a mutable object. Depending on the backend, this might perform // a deep copy. SparseInMemIterativeLinearSystemTransitionState > state = args[0]; // Aggregates that haven't seen any data just return Null. if (state.numEquations == 0) return Null(); // Eigen works better when you reserve the number of nnz // Note: When calling reserve(), it is not required that nnz is the // exact number of nonzero elements in the final matrix. However, an // exact estimation will avoid multiple reallocations during the // insertion phase. SparseMatrix A(state.numEquations, state.numVars); A.reserve(state.NNZA); ColumnVector x; for (uint32_t i=0; i < state.NNZA; i++){ A.insert(static_cast(state.r(i)), static_cast(state.c(i))) = state.v(i); } int iters = 0; double error = 0.; // Switch case needs scoping in C++ if you want to declare inside it // Unfortunately, this means that I have to write the code to call the // solver in each switch case separtely // Note: CG uses a diagonal preconditioner by default switch (state.algorithm) { case 1:{ Eigen::ConjugateGradient solver; solver.setTolerance(state.termToler); solver.setMaxIterations(static_cast(state.maxIter)); x = solver.compute(A).solve(state.b); iters = solver.iterations(); error = solver.error(); break; } case 2:{ Eigen::BiCGSTAB solver; solver.setTolerance(state.termToler); solver.setMaxIterations(static_cast(state.maxIter)); x = solver.compute(A).solve(state.b); iters = solver.iterations(); error = solver.error(); break; } // Preconditioned CG: Uses an incomplete LUT preconditioner // For the incomplete LUT preconditioner. You might want to see Eigen docs // for factors like fill-in which will make the algorithm more suitable // for handling tougher linear systgems case 3:{ // 3 Arguments in template // ConjugateGradient< _MatrixType, _UpLo, _Preconditioner >: Eigen::ConjugateGradient > solver; solver.setTolerance(state.termToler); solver.setMaxIterations(static_cast(state.maxIter)); x = solver.compute(A).solve(state.b); iters = solver.iterations(); error = solver.error(); break; } case 4:{ // 2 Arguments in template: // ConjugateGradient< _MatrixType, _UpLo, _Preconditioner >: // NOTE: BiCGSTAB is a bi-conjugate gradient that does not // require a lower or upper triangular option Eigen::BiCGSTAB > solver; solver.setTolerance(state.termToler); solver.setMaxIterations(static_cast(state.maxIter)); x = solver.compute(A).solve(state.b); iters = solver.iterations(); error = solver.error(); break; } } // Compute the residual return inmem_iterative_sparse_stateToResult(*this, x, iters, error); } /** * @brief Helper function that computes the final statistics */ AnyType inmem_iterative_sparse_stateToResult( const Allocator &inAllocator, const ColumnVector &x, const int iters, const double residual_norm) { MutableNativeColumnVector solution( inAllocator.allocateArray(x.size())); for (Index i = 0; i < x.size(); ++i) { solution(i) = x(i); } // Return all coefficients, standard errors, etc. in a tuple AnyType tuple; tuple << solution << residual_norm << iters; return tuple; } } // namespace linear_systems } // namespace modules } // namespace madlib