/* ----------------------------------------------------------- */ /** * * @file cox_proportional_hazards.cpp * * @brief Cox proportional hazards * * ----------------------------------------------------------- */ #include #include #include #include #include "coxph_improved.hpp" #include "CoxPHState.hpp" namespace madlib { namespace modules { namespace stats { // Use Eigen using namespace dbal::eigen_integration; // Import names from other MADlib modules using dbal::NoSolutionFoundException; using namespace std; // ---------------------------------------------------------------------- /** * @brief Compute the diagnostic statistics * */ AnyType stateToResult( const Allocator &inAllocator, const ColumnVector &inCoef, const ColumnVector &diagonal_of_inverse_of_hessian, double logLikelihood, const Matrix &inHessian, int nIter, const ColumnVector &stds) { MutableNativeColumnVector coef( inAllocator.allocateArray(inCoef.size())); MutableNativeColumnVector std_err( inAllocator.allocateArray(inCoef.size())); MutableNativeColumnVector waldZStats( inAllocator.allocateArray(inCoef.size())); MutableNativeColumnVector waldPValues( inAllocator.allocateArray(inCoef.size())); for (Index i = 0; i < inCoef.size(); ++i) { coef(i) = inCoef(i) / stds(i); std_err(i) = std::sqrt(diagonal_of_inverse_of_hessian(i)) / stds(i); waldZStats(i) = coef(i) / std_err(i); waldPValues(i) = 2. * prob::cdf( prob::normal(), -std::abs(waldZStats(i))); } // Hessian being symmetric is updated as lower triangular matrix. // We need to convert diagonal matrix to full-matrix before output Matrix full_hessian = inHessian + inHessian.transpose(); full_hessian.diagonal() /= 2; for (Index i = 0; i < inCoef.size(); ++i) for (Index j = 0; j < inCoef.size(); ++j) full_hessian(i,j) *= stds(i) * stds(j); // Return all coefficients, standard errors, etc. in a tuple AnyType tuple; tuple << coef << logLikelihood << std_err << waldZStats << waldPValues << full_hessian << nIter; return tuple; } // ------------------------------------------------------------ AnyType split_transition::run(AnyType &args) { double val = args[1].getAs(); int buff_size = args[2].getAs(); int num_splits = args[3].getAs(); if (num_splits == 1) return Null(); MutableArrayHandle state(NULL); if (args[0].isNull()) { state = allocateArray(buff_size + 2); state[0] = 0; state[1] = num_splits; } else { state = args[0].getAs >(); } // The pre-allocated buffer has been filled up if (state[0] >= buff_size) { return state; } state[static_cast(++state[0]) + 1] = val; return state; } // ------------------------------------------------------------ AnyType split_merge::run(AnyType &args) { if (args[0].isNull()) return args[1]; if (args[1].isNull()) return args[0]; ArrayHandle state1 = args[0].getAs >(); ArrayHandle state2 = args[1].getAs >(); int merged_size = static_cast(state1[0]) + static_cast(state2[0]); MutableArrayHandle merged_state = allocateArray(merged_size + 2); merged_state[0] = merged_size; merged_state[1] = state1[1]; memcpy(merged_state.ptr() + 2, state1.ptr() + 2, static_cast(state1[0]) * sizeof(double)); memcpy(merged_state.ptr() + 2 + static_cast(state1[0]), state2.ptr() + 2, static_cast(state2[0]) * sizeof(double)); return merged_state; } // ------------------------------------------------------------ AnyType split_final::run(AnyType &args) { if (args[0].isNull()) return args[0]; MutableArrayHandle state = args[0].getAs >(); int num_splits = static_cast(state[1]); if (num_splits == 1) { return Null(); } int size_splits = static_cast(state[0]) / num_splits; if (num_splits > state[0]) throw std::runtime_error("The number of splits is too large."); // Sort the sample std::sort(state.ptr() + 2, state.ptr() + state.size()); MutableArrayHandle splits = allocateArray(num_splits - 1); for (int i = 0; i < num_splits - 1; i++) splits[i] = state[size_splits * (i + 1) - 1 + 2]; return splits; } // ------------------------------------------------------------ AnyType compute_grpid::run(AnyType &args) { // When no split points provided, simply return 0 (single group) // This can be used to deal with the very small strata case if (args[0].isNull()) return 0; MappedColumnVector splits = args[0].getAs(); double val = args[1].getAs(); // Decreasing group ids from left to right bool inverse = args[2].getAs(); std::vector v(splits.data(), splits.data() + splits.size()); if (inverse) return static_cast(v.end() - std::lower_bound(v.begin(), v.end(), val)); else return static_cast(std::lower_bound(v.begin(), v.end(), val) - v.begin()); } // ------------------------------------------------------------ AnyType compute_coxph_result::run(AnyType &args) { MappedColumnVector coef = args[0].getAs(); double L = args[1].getAs(); MappedColumnVector d2L = args[2].getAs(); int nIter = args[3].getAs(); MappedColumnVector stds = args[4].getAs(); int m = static_cast(coef.size()); Matrix hessian = d2L; hessian.resize(m, m); SymmetricPositiveDefiniteEigenDecomposition decomposition( hessian, EigenvaluesOnly, ComputePseudoInverse); return stateToResult(*this, coef, decomposition.pseudoInverse().diagonal(), L, hessian, nIter, stds); } // ------------------------------------------------------------ // The transition function // No need to deal with NULL, because all NULL values have been // filtered out during creating the re-distributed table. AnyType coxph_improved_step_transition::run(AnyType& args) { // Current state, independant variables & dependant variables CoxPHState > state = args[0]; MappedColumnVector y = args[2].getAs(); // status is converted to int in the python code ArrayHandle status = args[3].getAs >(); MappedColumnVector coef = args[4].getAs(); MappedColumnVector max_coef = args[5].getAs(); MappedMatrix xx = args[1].getAs(); // The following check was added with MADLIB-138. if (!dbal::eigen_integration::isfinite(xx)) throw std::domain_error("Design matrix is not finite."); if (state.numRows == 0) { state.initialize(*this, static_cast(coef.size()), coef.data()); for (size_t i = 0; i < state.widthOfX; i++) state.max_coef(i) = max_coef(i); } for (int i = 0; i < xx.cols(); i++) { const ColumnVector& x = xx.col(i); double exp_coef_x = std::exp(trans(coef) * x); state.numRows++; /** In case of a tied time of death or in the first iteration: We must only perform the "pre compuations". When the tie is resolved we add up all the precomputations once in for all. This is an implementation of Breslow's method. The time of death for two records are considered "equal" if they differ by less than 1.0e-6. Also, in case status = 0, the observation must be censored so no computations are required */ if (std::abs(y[i] - state.y_previous) < 1.0e-6 || state.numRows == 1) { if (status[i] == 1) { state.multiplier++; } } else { state.grad -= state.multiplier*state.H/state.S; triangularView(state.hessian) -= ((state.H*trans(state.H))/(state.S*state.S) - state.V/state.S)*state.multiplier; state.logLikelihood -= state.multiplier*std::log(state.S); state.multiplier = status[i]; } /** These computations must always be performed irrespective of whether there are ties or not. Note: See design documentation for details on the implementation. */ state.S += exp_coef_x; state.H += exp_coef_x * x; state.V += x * trans(x) * exp_coef_x; state.y_previous = y[i]; if (status[i] == 1) { state.tdeath += 1; state.grad += x; state.logLikelihood += std::log(exp_coef_x); } } return state; } // ------------------------------------------------------------ /** * @brief Newton method final step for Cox Proportional Hazards */ // This is the same as the old implementation. AnyType coxph_improved_step_final::run(AnyType& args) { CoxPHState > state = args[0]; // If we haven't seen any data, just return Null. if (state.numRows == 0) return Null(); if (!state.hessian.is_finite() || !state.grad.is_finite()) throw NoSolutionFoundException("Over- or underflow in intermediate " "calculation. Input data is likely of poor numerical condition."); // First merge all tied times of death for the last row state.grad -= state.multiplier*state.H/state.S; triangularView(state.hessian) -= ((state.H*trans(state.H))/(state.S*state.S) - state.V/state.S)*state.multiplier; state.logLikelihood -= state.multiplier*std::log(state.S); if (isinf(static_cast(state.logLikelihood)) || isnan(static_cast(state.logLikelihood))) throw NoSolutionFoundException("Over- or underflow in intermediate " "calculation. Input data is likely of poor numerical condition."); // Computing pseudo inverse of a PSD matrix SymmetricPositiveDefiniteEigenDecomposition decomposition( state.hessian, EigenvaluesOnly, ComputePseudoInverse); Matrix inverse_of_hessian = decomposition.pseudoInverse(); // Newton step //state.coef += state.hessian.inverse()*state.grad; state.coef += inverse_of_hessian * state.grad; // Limit the values of coef if necessary if (state.max_coef(0) > -1) { // iterations use max_coef for (size_t i = 0; i < state.widthOfX; i++) if (state.coef(i) > state.max_coef(i)) state.coef(i) = state.max_coef(i); else if (state.coef(i) < - state.max_coef(i)) state.coef(i) = - state.max_coef(i); } else { // first iteration computes max_coef for (size_t i = 0; i < state.widthOfX; i++) state.max_coef(i) = 20 * sqrt(state.hessian(i,i) / state.tdeath); } // Return all coefficients etc. in a tuple AnyType tuple; tuple << state.coef << static_cast(state.logLikelihood) << MappedColumnVector(state.hessian.data(), state.hessian.rows() * state.hessian.cols()) // Python doesn't support 2d array << state.max_coef; return tuple; } // ------------------------------------------------------------ AnyType coxph_improved_strata_step_final::run(AnyType& args) { CoxPHState > state = args[0]; // If we haven't seen any data, just return Null. if (state.numRows == 0) return Null(); if (!state.hessian.is_finite() || !state.grad.is_finite()) throw NoSolutionFoundException("Over- or underflow in intermediate " "calculation. Input data is likely of poor numerical condition."); // Computing pseudo inverse of a PSD matrix SymmetricPositiveDefiniteEigenDecomposition decomposition( state.hessian, EigenvaluesOnly, ComputePseudoInverse); Matrix inverse_of_hessian = decomposition.pseudoInverse(); // Newton step //state.coef += state.hessian.inverse()*state.grad; state.coef += inverse_of_hessian * state.grad; // Limit the values of coef if necessary if (state.max_coef(0) > 0) { // iterations use max_coef for (size_t i = 0; i < state.widthOfX; i++) if (state.coef(i) > state.max_coef(i)) state.coef(i) = state.max_coef(i); else if (state.coef(i) < - state.max_coef(i)) state.coef(i) = - state.max_coef(i); } else { // first iteration computes max_coef for (size_t i = 0; i < state.widthOfX; i++) state.max_coef(i) = 20 * sqrt(state.hessian(i,i) / state.tdeath); } // Return all coefficients etc. in a tuple AnyType tuple; tuple << state.coef << static_cast(state.logLikelihood) << MappedColumnVector(state.hessian.data(), state.hessian.rows() * state.hessian.cols()) // Python doesn't support 2d array << state.max_coef; return tuple; } // ------------------------------------------------------------ AnyType array_avg_transition::run(AnyType& args) { if (args[1].isNull()) { return args[0]; } MappedColumnVector x; try { MappedColumnVector xx = args[1].getAs(); x.rebind(xx.memoryHandle(), xx.size()); } catch (const ArrayWithNullException &e) { return args[0]; } bool use_abs = args[2].getAs(); MutableArrayHandle state = NULL; if(args[0].isNull()) state = allocateArray(x.size() + 1); else state = args[0].getAs >(); state[0] += 1; if (use_abs) for (int i = 1; i <= x.size(); i++) state[i] += fabs(x(i-1)); else for (int i = 1; i <= x.size(); i++) state[i] += x(i-1); return state; } // ------------------------------------------------------------ AnyType array_avg_merge::run(AnyType& args) { if (args[0].isNull()) return args[1]; if (args[1].isNull()) return args[0]; MutableArrayHandle left = args[0].getAs >(); ArrayHandle right = args[1].getAs >(); for (size_t i = 0; i < left.size(); i++) left[i] += right[i]; return left; } // ------------------------------------------------------------ AnyType array_avg_final::run(AnyType& args) { if(args[0].isNull()) return args[0]; ArrayHandle state = args[0].getAs >(); MutableArrayHandle avg = allocateArray(state.size() - 1); for (size_t i = 0; i < avg.size(); i++) avg[i] = state[i+1] / state[0]; return avg; } // ------------------------------------------------------------ AnyType array_element_min::run(AnyType &args) { if(args[0].isNull()) return args[1]; if(args[1].isNull()) return args[0]; MutableMappedColumnVector state = args[0].getAs(); MappedColumnVector array = args[1].getAs(); if(state.size() != array.size()) throw std::runtime_error("The dimension mismatch."); for(int i = 0; i < state.size(); i++) state(i) = min(state(i), array(i)); return state; } // ------------------------------------------------------------ AnyType array_element_max::run(AnyType &args) { if(args[0].isNull()) return args[1]; if(args[1].isNull()) return args[0]; MutableMappedColumnVector state = args[0].getAs(); MappedColumnVector array = args[1].getAs(); if(state.size() != array.size()) throw std::runtime_error("The dimension mismatch."); for(int i = 0; i < state.size(); i++) state(i) = max(state(i), array(i)); return state; } } // namespace stats } // namespace modules } // namespace madlib