/* ----------------------------------------------------------------------- */ /** * * @file cox_proportional_hazards.cpp * * @brief Cox proportional hazards * * ----------------------------------------------------------------------- */ #include #include #include #include #include "marginal_cox.hpp" namespace madlib { namespace modules { namespace stats { // Use Eigen using namespace dbal::eigen_integration; // Import names from other MADlib modules using dbal::NoSolutionFoundException; using namespace std; // --------------------------------------------------------------------------- // Marginal Effects Cox Proportional Hazard State // --------------------------------------------------------------------------- /** * @brief State for marginal effects calculation for cox proportional hazards * * TransitionState encapsualtes the transition state during the * marginal effects calculation. * To the database, the state is exposed as a single DOUBLE PRECISION array, * to the C++ code it is a proper object containing scalars and vectors. * * Note: We assume that the DOUBLE PRECISION array is initialized by the * database with length at least 5, and all elemenets are 0. * */ template class MarginsCoxPropHazardState { template friend class MarginsCoxPropHazardState; public: MarginsCoxPropHazardState(const AnyType &inArray) : mStorage(inArray.getAs()) { rebind(static_cast(mStorage[0]), static_cast(mStorage[1]), static_cast(mStorage[2])); } /** * @brief Convert to backend representation * * We define this function so that we can use State in the * argument list and as a return type. */ inline operator AnyType() const { return mStorage; } /** * @brief Initialize the marginal variance calculation state. * * This function is only called for the first iteration, for the first row. */ inline void initialize(const Allocator &inAllocator, const uint16_t inWidthOfX, const uint16_t inNumBasis, const uint16_t inNumCategoricals) { mStorage = inAllocator.allocateArray( arraySize(inWidthOfX, inNumBasis, inNumCategoricals)); rebind(inWidthOfX, inNumBasis, inNumCategoricals); widthOfX = inWidthOfX; numBasis = inNumBasis; numCategoricalVarsInSubset = inNumCategoricals; } /** * @brief We need to support assigning the previous state */ template MarginsCoxPropHazardState &operator=( const MarginsCoxPropHazardState &inOtherState) { for (size_t i = 0; i < mStorage.size(); i++) mStorage[i] = inOtherState.mStorage[i]; return *this; } /** * @brief Merge with another State object by copying the intra-iteration * fields */ template MarginsCoxPropHazardState &operator+=( const MarginsCoxPropHazardState &inOtherState) { if (mStorage.size() != inOtherState.mStorage.size() || widthOfX != inOtherState.widthOfX) throw std::logic_error("Internal error: Incompatible transition " "states"); numRows += inOtherState.numRows; marginal_effects += inOtherState.marginal_effects; delta += inOtherState.delta; return *this; } /** * @brief Reset the inter-iteration fields. */ inline void reset() { numRows = 0; baseline_hazard = 0; marginal_effects.fill(0); training_data_vcov.fill(0); delta.fill(0); if (numCategoricalVarsInSubset > 0) categorical_basis_indices.fill(0); } private: static inline size_t arraySize(const uint16_t inWidthOfX, const uint16_t inNumBasis, const uint16_t inNumCategoricalVars) { return 5 + inNumBasis + inNumCategoricalVars + (inWidthOfX + inNumBasis) * inWidthOfX; } /** * @brief Rebind to a new storage array * * @param inWidthOfX The number of independent variables. * */ void rebind(const uint16_t inWidthOfX, const uint16_t inNumBasis, const uint16_t inNumCategoricalVars) { widthOfX.rebind(&mStorage[0]); numBasis.rebind(&mStorage[1]); numCategoricalVarsInSubset.rebind(&mStorage[2]); numRows.rebind(&mStorage[3]); baseline_hazard.rebind(&mStorage[4]); marginal_effects.rebind(&mStorage[5], inNumBasis); training_data_vcov.rebind(&mStorage[5 + inNumBasis], inWidthOfX, inWidthOfX); delta.rebind(&mStorage[5 + inNumBasis + inWidthOfX*inWidthOfX], inNumBasis, inWidthOfX); if (inNumCategoricalVars > 0) categorical_basis_indices.rebind(&mStorage[5 + inNumBasis + (inWidthOfX + inNumBasis)*inWidthOfX], inNumCategoricalVars); } Handle mStorage; public: typename HandleTraits::ReferenceToUInt16 widthOfX; typename HandleTraits::ReferenceToUInt16 numBasis; typename HandleTraits::ReferenceToUInt16 numCategoricalVarsInSubset; typename HandleTraits::ReferenceToUInt64 numRows; typename HandleTraits::ReferenceToDouble baseline_hazard; typename HandleTraits::ColumnVectorTransparentHandleMap marginal_effects; typename HandleTraits::MatrixTransparentHandleMap training_data_vcov; typename HandleTraits::MatrixTransparentHandleMap delta; typename HandleTraits::ColumnVectorTransparentHandleMap categorical_basis_indices; }; // ---------------------------------------------------------------------- /** * @brief Perform the marginal effects transition step */ AnyType margins_coxph_int_transition::run(AnyType &args) { MarginsCoxPropHazardState > state = args[0]; if (args[1].isNull() || args[2].isNull() || args[3].isNull() || args[4].isNull()) { return args[0]; } MappedColumnVector f; try { // an exception is raised in the backend if args[1] contains nulls MappedColumnVector xx = args[1].getAs(); // x is a const reference, we can only rebind to change its pointer f.rebind(xx.memoryHandle(), xx.size()); } catch (const ArrayWithNullException &e) { return args[0]; } // The following check was added with MADLIB-138. if (!dbal::eigen_integration::isfinite(f)) throw std::domain_error("Design matrix is not finite."); // beta is the coefficient vector from cox proportional hazards MappedColumnVector beta = args[2].getAs(); // basis_indices represents the indices (of beta) for which we want to // compute the marginal effect. We don't need to compute the ME for all // variables, thus basis_indices could be a subset of all indices. MappedColumnVector basis_indices = args[4].getAs(); // all variable symbols correspond to the design document const uint16_t N = static_cast(beta.size()); const uint16_t M = static_cast(basis_indices.size()); assert(N >= M); Matrix J; // J: N x M if (args[5].isNull()){ J = Matrix::Zero(N, M); for (Index i = 0; i < M; ++i) J(static_cast(basis_indices(i)), i) = 1; } else{ J = args[5].getAs(); } assert(J.rows() == N && J.cols() == M); MappedColumnVector categorical_indices; uint16_t numCategoricalVars = 0; if (!args[6].isNull()) { // categorical_indices represents which indices (from beta) are // categorical variables try { MappedColumnVector xx = args[6].getAs(); categorical_indices.rebind(xx.memoryHandle(), xx.size()); } catch (const ArrayWithNullException &e) { throw std::runtime_error("The categorical indices contain NULL values"); } numCategoricalVars = static_cast(categorical_indices.size()); } if (state.numRows == 0) { if (f.size() > std::numeric_limits::max()) throw std::domain_error("Number of independent variables cannot be " "larger than 65535."); std::vector tmp_cat_basis_indices; if (numCategoricalVars > 0){ // find which of the variables in basis_indices are categorical // and store only the indices for these variables in our state for (Index i = 0; i < basis_indices.size(); ++i){ for (Index j = 0; j < categorical_indices.size(); ++j){ if (basis_indices(i) == categorical_indices(j)){ tmp_cat_basis_indices.push_back(static_cast(i)); continue; } } } state.numCategoricalVarsInSubset = static_cast(tmp_cat_basis_indices.size()); } state.initialize(*this, static_cast(N), static_cast(M), static_cast(state.numCategoricalVarsInSubset)); state.reset(); // coxph stores the Hessian matrix in it's output table. This is // different from the various regression methods that provides the // training vcov matrix directly. Matrix hessian = args[3].getAs(); // Computing pseudo inverse of a PSD matrix SymmetricPositiveDefiniteEigenDecomposition decomposition( hessian, EigenvaluesOnly, ComputePseudoInverse); state.training_data_vcov = decomposition.pseudoInverse(); if (state.numCategoricalVarsInSubset > 0){ for (int i=0; i < state.numCategoricalVarsInSubset; ++i){ state.categorical_basis_indices(i) = tmp_cat_basis_indices[i]; } } } // Now do the transition step state.numRows++; double f_beta = dot(f, beta); state.baseline_hazard += f_beta; double exp_f_beta = exp(f_beta); // compute marginal effects and delta using 1st and 2nd derivatives ColumnVector J_trans_beta; J_trans_beta = trans(J) * beta; ColumnVector curr_margins = exp_f_beta * J_trans_beta; Matrix curr_delta = exp_f_beta * (trans(J) + J_trans_beta * trans(f)); // f_set_mat and f_unset_mat are matrices where each row corresponds to a // categorical basis variable and the columns correspond to all terms // (basis and interaction) Matrix f_set_mat; // numCategoricalVarsInSubset x N Matrix f_unset_mat; // numCategoricalVarsInSubset x N if (!args[7].isNull() && !args[8].isNull()){ // the matrix is read in column-order but passed in row-order f_set_mat = args[7].getAs(); f_set_mat.transposeInPlace(); f_unset_mat = args[8].getAs(); f_unset_mat.transposeInPlace(); } // PERFORMANCE TWEAK: for the no interaction case, f_set_mat and f_unset_mat // only need column entries for the categorical variables (others are same // as f). Since, passing a smaller matrix into the transition function is // faster, for the no interaction case, we don't need to input the whole // matrix into this function. For the interaction case, since it is unknown // which indices have interaction, we require entries for all columns in the // matrices. bool no_interactions = (f_set_mat.cols() < N); for (Index i = 0; i < state.numCategoricalVarsInSubset; ++i) { // Note: categorical_indices are assumed to be zero-based ColumnVector f_set; ColumnVector f_unset; ColumnVector shortened_f_set = f_set_mat.row(i); ColumnVector shortened_f_unset = f_unset_mat.row(i); if (no_interactions){ f_set = f; f_unset = f; for (Index j=0; j < shortened_f_set.size(); ++j){ f_set(static_cast(categorical_indices(j))) = shortened_f_set(j); f_unset(static_cast(categorical_indices(j))) = shortened_f_unset(j); } } else { f_set = shortened_f_set; f_unset = shortened_f_unset; } double h_set = exp(dot(f_set, beta)); double h_unset = exp(dot(f_unset, beta)); curr_margins(static_cast(state.categorical_basis_indices(i))) = h_set - h_unset; curr_delta.row(static_cast(state.categorical_basis_indices(i))) = (h_set * trans(f_set) - h_unset * trans(f_unset)); } state.marginal_effects += curr_margins; state.delta += curr_delta; return state; } /** * @brief Cox Proportional Hazards marginal effects: Merge transition states */ AnyType margins_coxph_int_merge::run(AnyType &args) { MarginsCoxPropHazardState > stateLeft = args[0]; MarginsCoxPropHazardState > stateRight = args[1]; // We first handle the trivial case where this function is called with one // of the states being the initial state if (stateLeft.numRows == 0) return stateRight; else if (stateRight.numRows == 0) return stateLeft; // Merge states together and return stateLeft += stateRight; return stateLeft; } /** * @brief Cox Proportional Hazards marginal effects: Final step */ AnyType margins_coxph_int_final::run(AnyType &args) { // We request a mutable object. // Depending on the backend, this might perform a deep copy. MarginsCoxPropHazardState > state = args[0]; // Aggregates that haven't seen any data just return Null. if (state.numRows == 0) return Null(); state.baseline_hazard = exp(state.baseline_hazard / static_cast(state.numRows)); state.baseline_hazard = 1; state.marginal_effects /= static_cast(state.numRows) / state.baseline_hazard; // Variance for marginal effects according to the delta method Matrix variance; variance = state.delta * state.training_data_vcov; // we only need the diagonal elements of the variance, so we perform a dot // product of each row with state.delta to compute each diagonal element. // We divide by numRows^2 since we need the average variance ColumnVector std_err = variance.cwiseProduct(state.delta).rowwise().sum(); std_err = std_err.array().sqrt() / static_cast(state.numRows); MutableNativeColumnVector tStats(this->allocateArray(state.numBasis)); MutableNativeColumnVector pValues(this->allocateArray(state.numBasis)); for (Index i = 0; i < state.numBasis; ++i) { tStats(i) = state.marginal_effects(i) / std_err(i); // p-values only make sense if numRows > coef.size() if (state.numRows > state.numBasis) pValues(i) = 2. * prob::cdf( prob::normal(), -std::abs(tStats(i))); } // Return all coefficients, standard errors, etc. in a tuple // Note: p-values will return NULL if numRows <= coef.size AnyType tuple; tuple << state.marginal_effects << std_err << tStats << (state.numRows > state.numBasis ? pValues: Null()); return tuple; } /** * @brief Cox Proportional Hazards marginal effects: Statistics function */ AnyType margins_compute_stats::run(AnyType &args) { // NULL input returns a NULL output if (args[0].isNull() || args[1].isNull()) { return Null(); } MappedColumnVector marginal_effects = args[0].getAs(); MappedColumnVector std_err = args[1].getAs(); uint16_t n_basis_terms = static_cast(marginal_effects.size()); MutableNativeColumnVector tStats( (*this).allocateArray(n_basis_terms)); MutableNativeColumnVector pValues( (*this).allocateArray(n_basis_terms)); for (Index i = 0; i < n_basis_terms; ++i) { tStats(i) = marginal_effects(i) / std_err(i); pValues(i) = 2. * prob::cdf( prob::normal(), -std::abs(tStats(i))); } // Return all standard errors and p_values in a tuple AnyType tuple; tuple << marginal_effects << std_err << tStats << pValues; return tuple; } // ----------------- End of Marginal Effects for Cox proportional hazards ------ } // namespace stats } // namespace modules } // namespace madlib