/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * * @file mlp_igd.cpp * * @brief Multilayer Perceptron functions * *//* ----------------------------------------------------------------------- */ #include #include #include #include "mlp_igd.hpp" #include "task/mlp.hpp" #include "task/l2.hpp" #include "algo/igd.hpp" #include "algo/loss.hpp" #include "type/tuple.hpp" #include "type/model.hpp" #include "type/state.hpp" namespace madlib { namespace modules { namespace convex { // These 2 classes contain public static methods that can be called typedef IGD >, MLPIGDState >, MLP >, MLPTuple > > MLPIGDAlgorithm; typedef IGD >, MLPMiniBatchState >, MLP >, MiniBatchTuple > > MLPMiniBatchAlgorithm; typedef Loss >, MLPIGDState >, MLP >, MLPTuple > > MLPLossAlgorithm; typedef MLP >,MLPTuple> MLPTask; typedef MLPModel > MLPModelType; /** * @brief Perform the multilayer perceptron transition step * * Called for each tuple. */ AnyType mlp_igd_transition::run(AnyType &args) { // For the first tuple: args[0] is nothing more than a marker that // indicates that we should do some initial operations. // For other tuples: args[0] holds the computation state until last tuple MLPIGDState > state = args[0]; // initilize the state if first tuple if (state.algo.numRows == 0) { if (!args[3].isNull()) { MLPIGDState > previousState = args[3]; state.allocate(*this, previousState.task.numberOfStages, previousState.task.numbersOfUnits); state = previousState; } else { // configuration parameters and initialization // this is run only once (first iteration, first tuple) ArrayHandle numbersOfUnits = args[4].getAs >(); uint16_t numberOfStages = static_cast(numbersOfUnits.size() - 1); state.allocate(*this, numberOfStages, reinterpret_cast(numbersOfUnits.ptr())); state.task.stepsize = args[5].getAs(); state.task.model.activation = static_cast(args[6].getAs()); state.task.model.is_classification = static_cast(args[7].getAs()); // args[8] is for weighting the input row, which is populated later. state.task.lambda = args[10].getAs(); MLPTask::lambda = state.task.lambda; state.task.model.momentum = args[11].getAs(); state.task.model.is_nesterov = static_cast(args[12].getAs()); if (!args[9].isNull()){ // initial coefficients are provided MappedColumnVector warm_start_coeff = args[9].getAs(); // copy warm start into the task model // state.reset() ensures algo.incrModel is copied from task.model Index layer_start = 0; for (size_t k = 0; k < numberOfStages; ++k){ for (Index j=0; j < state.task.model.u[k].cols(); ++j){ for (Index i=0; i < state.task.model.u[k].rows(); ++i){ state.task.model.u[k](i, j) = warm_start_coeff( layer_start + j * state.task.model.u[k].rows() + i); } } layer_start += state.task.model.u[k].rows() * state.task.model.u[k].cols(); } } else { // initialize the model with appropriate coefficients state.task.model.initialize( numberOfStages, reinterpret_cast(numbersOfUnits.ptr())); } } // resetting in either case state.reset(); } MLPTuple tuple; try { tuple.indVar = args[1].getAs();; tuple.depVar = args[2].getAs(); } catch (const ArrayWithNullException &e) { return args[0]; } tuple.weight = args[8].getAs(); MLPIGDAlgorithm::transition(state, tuple); // Use the model from the previous iteration to compute the loss (note that // it is stored in Task's state, and the Algo's state holds the model from // the current iteration. MLPLossAlgorithm::transition(state, tuple); state.algo.numRows ++; return state; } /** * @brief Perform the preliminary aggregation function: Merge transition states */ AnyType mlp_igd_merge::run(AnyType &args) { MLPIGDState > stateLeft = args[0]; MLPIGDState > stateRight = args[1]; if (stateLeft.algo.numRows == 0) { return stateRight; } else if (stateRight.algo.numRows == 0) { return stateLeft; } MLPIGDAlgorithm::merge(stateLeft, stateRight); MLPLossAlgorithm::merge(stateLeft, stateRight); // The following numRows update cannot be put above, because the model // averaging depends on their original values stateLeft.algo.numRows += stateRight.algo.numRows; return stateLeft; } /** * @brief Perform the multilayer perceptron final step */ AnyType mlp_igd_final::run(AnyType &args) { // We request a mutable object. Depending on the backend, this might perform // a deep copy. MLPIGDState > state = args[0]; if (state.algo.numRows == 0) { return Null(); } L2::lambda = state.task.lambda; state.algo.loss = state.algo.loss/static_cast(state.algo.numRows); state.algo.loss += L2::loss(state.task.model); MLPIGDAlgorithm::final(state); return state; } /** * @brief Perform the multilayer perceptron minibatch transition step * * Called for each tuple. */ AnyType mlp_minibatch_transition::run(AnyType &args) { // For the first tuple: args[0] is nothing more than a marker that // indicates that we should do some initial operations. // For other tuples: args[0] holds the computation state until last tuple MLPMiniBatchState > state = args[0]; // initialize the state if first tuple if (state.numRows == 0) { if (!args[3].isNull()) { MLPMiniBatchState > previousState = args[3]; state.allocate(*this, previousState.numberOfStages, previousState.numbersOfUnits); state = previousState; } else { // configuration parameters ArrayHandle numbersOfUnits = args[4].getAs >(); uint16_t numberOfStages = static_cast(numbersOfUnits.size() - 1); state.allocate(*this, numberOfStages, reinterpret_cast(numbersOfUnits.ptr())); state.stepsize = args[5].getAs(); state.model.activation = static_cast(args[6].getAs()); state.model.is_classification = static_cast(args[7].getAs()); // args[8] is for weighting the input row, which is populated later. state.model.momentum = args[13].getAs(); state.model.is_nesterov = static_cast(args[14].getAs()); if (!args[9].isNull()){ // initial coefficients are provided copy warm start into the model MappedColumnVector warm_start_coeff = args[9].getAs(); Index layer_start = 0; for (size_t k = 0; k < numberOfStages; ++k){ for (Index j=0; j < state.model.u[k].cols(); ++j){ for (Index i=0; i < state.model.u[k].rows(); ++i){ state.model.u[k](i, j) = warm_start_coeff( layer_start + j * state.model.u[k].rows() + i); } } layer_start += state.model.u[k].rows() * state.model.u[k].cols(); } } else { // initialize the model with appropriate coefficients state.model.initialize( numberOfStages, reinterpret_cast(numbersOfUnits.ptr())); } state.lambda = args[10].getAs(); MLPTask::lambda = state.lambda; state.batchSize = static_cast(args[11].getAs()); state.nEpochs = static_cast(args[12].getAs()); } // resetting in either case state.reset(); } MiniBatchTuple tuple; try { // Ideally there should be no NULLs in the pre-processed input data, // but keep it in a try block in case the user has modified the // pre-processed data in any way. // The matrices are by default read as column-major. We will have to // transpose it to get back the matrix like how it is in the database. tuple.indVar = trans(args[1].getAs()); tuple.depVar = trans(args[2].getAs()); } catch (const ArrayWithNullException &e) { return args[0]; } tuple.weight = args[8].getAs(); /* Note that the IGD version uses the model in Task (model from the previous iteration) to compute the loss. Minibatch uses the model from Algo (the model based on current iteration) to compute the loss. The difference in loss based on one iteration is not too much, hence doing so here. We therefore don't need to maintain another copy of the model (from previous iteration) in the state. The model for the current iteration, and the loss are both computed in one function now. */ MLPMiniBatchAlgorithm::transitionInMiniBatch(state, tuple); state.numRows += tuple.indVar.rows(); return state; } /** * @brief Perform the perliminary aggregation function: Merge transition states */ AnyType mlp_minibatch_merge::run(AnyType &args) { MLPMiniBatchState > stateLeft = args[0]; MLPMiniBatchState > stateRight = args[1]; if (stateLeft.numRows == 0) { return stateRight; } else if (stateRight.numRows == 0) { return stateLeft; } MLPMiniBatchAlgorithm::mergeInPlace(stateLeft, stateRight); // The following numRows update, cannot be put above, because the model // averaging depends on their original values stateLeft.numRows += stateRight.numRows; stateLeft.loss += stateRight.loss; return stateLeft; } /** * @brief Perform the multilayer perceptron final step */ AnyType mlp_minibatch_final::run(AnyType &args) { // We request a mutable object. Depending on the backend, this might perform // a deep copy. MLPMiniBatchState > state = args[0]; // Aggregates that haven't seen any data just return Null. if (state.numRows == 0) { return Null(); } L2::lambda = state.lambda; state.loss = state.loss/static_cast(state.numRows); state.loss += L2::loss(state.model); return state; } /** * @brief Return the difference in RMSE between two states */ AnyType internal_mlp_igd_distance::run(AnyType &args) { MLPIGDState > stateLeft = args[0]; MLPIGDState > stateRight = args[1]; return std::abs(stateLeft.algo.loss - stateRight.algo.loss); } AnyType internal_mlp_minibatch_distance::run(AnyType &args) { MLPMiniBatchState > stateLeft = args[0]; MLPMiniBatchState > stateRight = args[1]; return std::abs(stateLeft.loss - stateRight.loss); } /** * @brief Return the coefficients and diagnostic statistics of the state */ AnyType internal_mlp_igd_result::run(AnyType &args) { MLPIGDState > state = args[0]; HandleTraits >::ColumnVectorTransparentHandleMap flattenU; flattenU.rebind(&state.task.model.u[0](0, 0), state.task.model.coeffArraySize(state.task.numberOfStages, state.task.numbersOfUnits)); AnyType tuple; tuple << flattenU << static_cast(state.algo.loss);; return tuple; } /** * @brief Return the coefficients and diagnostic statistics of the state */ AnyType internal_mlp_minibatch_result::run(AnyType &args) { MLPMiniBatchState > state = args[0]; HandleTraits >::ColumnVectorTransparentHandleMap flattenU; flattenU.rebind(&state.model.u[0](0, 0), state.model.coeffArraySize(state.numberOfStages, state.numbersOfUnits)); AnyType tuple; tuple << flattenU << static_cast(state.loss); return tuple; } AnyType internal_predict_mlp::run(AnyType &args) { MLPModel > model; ColumnVector indVar; int is_response = args[5].getAs(); MappedColumnVector x_means = args[6].getAs(); MappedColumnVector x_stds = args[7].getAs(); MappedColumnVector coeff = args[0].getAs(); MappedColumnVector layerSizes = args[4].getAs(); // Input layer doesn't count uint16_t numberOfStages = static_cast(layerSizes.size() - 1); double is_classification = args[2].getAs(); double activation = args[3].getAs(); int is_dep_var_array_for_classification = args[8].getAs(); bool is_classification_response = is_classification && is_response; // The model rebind function is called by both predict and train functions. // Since we have to use the same function, we are passing a dummy value for // activation, momentum and nesterov because predict does not care // about the actual values for these params. const double dummy_value = static_cast(-1); model.rebind(&is_classification, &activation, &dummy_value, &dummy_value, &coeff.data()[0], numberOfStages, &layerSizes.data()[0]); try { indVar = (args[1].getAs()-x_means).cwiseQuotient(x_stds); } catch (const ArrayWithNullException &e) { return args[0]; } ColumnVector prediction = MLPTask::predict(model, indVar, is_classification_response, is_dep_var_array_for_classification); return prediction; } } // namespace convex } // namespace modules } // namespace madlib