Foro Formación Hadoop

Prepare Your Apache Hadoop Cluster for PySpark Jobs

 
Picture of Admin Formación Hadoop
Prepare Your Apache Hadoop Cluster for PySpark Jobs
by Admin Formación Hadoop - Wednesday, 30 September 2015, 10:24 AM
 

Proper configuration of your Python environment is a critical pre-condition for using Apache Spark’s Python API.

One of the most enticing aspects of Apache Spark for data scientists is the API it provides in non-JVM languages for Python (via PySpark) and for R (via SparkR). There are a few reasons that these language bindings have generated a lot of excitement: Most data scientists think writing Java or Scala is a drag, they already know Python or R, or don’t want to learn a new language to write code for distributed computing. Most important, these languages already have a rich variety of numerical libraries with a statistical, machine learning, or optimization focus.

Like everything in engineering, there are tradeoffs to be made when picking these non-JVM languages for your Spark code. Java offers advantages like platform independence by running inside the JVM, self-contained packaging of code and its dependencies into JAR files, and higher performance since Spark itself runs in the JVM. If you chose to use Python, users lose such advantages. In particular, managing dependencies and making them available for PySpark jobs on a cluster can be a pain. In this blog post, I will explain what your options are.

To determine what dependencies are required on the cluster ahead of time, it is important to understand where different parts of Spark code get executed and how computation is distributed on the cluster. Spark orchestrates its operations via the driver program. The driver program initializes a SparkContext, in which you define your data actions and transformations, e.g. map, flatMap, and filter. When the driver program is run, the Spark framework initializes executor processes on the worker nodes that then process your data across the cluster.

Self-contained Dependency

If the Python transformations you define use any third-party libraries, like NumPy or nltk, then the Spark executors will need access to those libraries when they execute your code on the remote worker nodes. A common situation is one where we have our own custom Python package that contains functionality we would like to apply to each element of our RDD. A simple example of this is illustrated below. I assume a SparkContext is already initialized as sc, as in the PySpark shell.

After creating a SparkContext, you create a simple rdd of four elements and call it int_rdd. Then you apply the function import_my_special_package to every element of the int_rdd. This function just imports my.special.package and then returns the original argument passed to it. This has the same effect as using classes or functions defined in my.special.package because Spark requires that each Spark executor can import my.special.package when its functionality is needed.

If you only need a single file inside my.special.package you may direct Spark to make this available to all executors by using the --py-files option in your spark-submit command and specifying the local path to the file. You may also specify this programmatically by using the sc.addPyFiles() function. If you use functionality from a package that spans multiple files in it, you will be better off making an *.egg for the package, as the --py-files flag also accepts a path to an egg file. (Caveat: if your package depends on compiled code and machines in your cluster have different CPU architectures than the code you compile your egg on, this will not work.)

In short, if you have a self-contained dependency there are a two ways that you can make required Python dependency available to your executors:

  • If you only depend on a single file, you can use either the --py-files command line option or programmatically add them to the SparkContext with sc.addPyFiles(path) and specify the local path to that Python file.
  • If you have have a dependency on a self contained module (meaning a module with no other dependencies) you can create an egg or zip file of that module and use either the --py-files command line option or programmatically add them to the SparkContext with sc.addPyFiles(path) and specify the local path to that egg or zip file.

Complex Dependency

If the operations you want to apply in a distributed way rely on complex packages that themselves have many dependencies, you have a real challenge. Let’s take the simple snippet below as an example:

Again, all we are doing is importing pandas. Pandas depends on NumPy, SciPy, and many other packages. We have a problem here because we cannot make an egg that contains all of the required dependencies. Instead, we need to have the required Python environment already set up on the cluster and the Spark executors configured to use that Python environment.

Now, let’s consider our options for making these libraries available. While pandas is too complex to just distribute a *.py file that contains the functionality we need, we could theoretically create an *.egg for it and try shipping it off to executors with the --py-files option on the command line or #addPyFiles() on a SparkContext. A major issue with this approach is that *.egg files for packages containing native code (which most numerically oriented Python packages do) must be compiled for the specific machine it will run on.

An assumption that anyone doing distributed computing with commodity hardware must assume is that the underlying hardware is potentially heterogeneous. A Python egg built on a client machine will be specific to the client’s CPU architecture because of the required C compilation. Distributing an egg for a complex, compiled package like NumPy, SciPy, or pandas is a brittle solution that is likely to fail on most clusters, at least eventually. This means we should prefer the alternative approach: have our required Python packages already installed on each node of the cluster and specify the path to the Python binaries for the worker nodes to use.

As long as the Python installations you want to use are in a consistent location on your cluster, you can set the PYSPARK_PYTHON environment variable to the path to your Python executables and Spark will use those as the Python installations your executors. You can set this environment variable on a per-session basis by executing the following line of the command line:

If you would like to consistently use this PYSPARK_PYTHON definition, you can add that line to your spark-env.sh. In CDH this script is located at /etc/spark/conf/spark-env.sh. If you set PYSPARK_PYTHON in spark-env.sh, you should check that users have not set this environment variable already with the following lines:

If you have complex dependencies like pandas or SciPy, you can create the required Python environment on each node of your cluster and set PYSPARK_PYTHON to the path to the associated Python executable.

Installing and Maintaining Python Environments

Installing and maintaining Python environments on a cluster is not an easy task, but it is the best solution that allows making full use of the Python package ecosystem with PySpark. In the best possible world, you have a good relationship with your local sysadmin and they are able and willing to set up a virtualenv or install the Anaconda distribution of Python on every node of your cluster, with your required dependencies. If you are a data scientist responsible for administering your own cluster, you may need to get creative about setting up your required Python environment on your cluster. If you have sysadmin or devops support for your cluster, use it! They are professionals who know what they are doing. If you are on your own, the following, somewhat fragile, instructions may be useful to you.

If you aren’t yet worrying about long term maintainability and just need to get a Python environment set up yourself, you could take the less maintainable path of setting up virtual environments on your cluster by executing commands on each machine using Cluster SSH, Parallel SSH, or Fabric.

As an example, I provide instructions for setting up the a standard data stack (including SciPy, NumPy, scikit-learn and pandas) in a virtualenv on a CentOS 6/RHEL 6 system, assuming you have logged into every node in your cluster using cluster ssh and each node has Python and pip installed. (Note that you may need sudo access in order to install packages, like LAPACK and BLAS, in the operating system.):

Once you have a virtualenv setup in a uniform location on each node in your cluster, you can use it as the Python executable for your Spark executors by setting the PYSPARK_PYTHON environment variable to /path/to/mynewenv/bin/python.

This is not particularly simple or easily maintainable. In a follow-up post I will discuss other options for creating and maintaining Python environments on a CDH cluster.

Acknowledgements

Thanks to Uri Laserson for his invaluable feedback on the blog post. Additional thanks to Sean Owen, Sandy Ryza, Mark Grover, Alex Moundalexis, and Stephanie Bodoff.

Juliet is a Data Scientist at Cloudera, and contributor/committer/maintainer for the Sparkling Pandas project. Juliet was the technical editor for Learning Spark by Karau et al. and Advanced Analytics with Spark by Ryza et al.

 

fuente: http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/