Foro Formación Hadoop
Prepare Your Apache Hadoop Cluster for PySpark Jobs
- by Juliet Hougland
- September 24, 2015
Proper configuration of your Python environment is a critical pre-condition for using Apache Spark’s Python API.
One of the most enticing aspects of Apache Spark for data scientists is the API it provides in non-JVM languages for Python (via PySpark) and for R (via SparkR). There are a few reasons that these language bindings have generated a lot of excitement: Most data scientists think writing Java or Scala is a drag, they already know Python or R, or don’t want to learn a new language to write code for distributed computing. Most important, these languages already have a rich variety of numerical libraries with a statistical, machine learning, or optimization focus.
Like everything in engineering, there are tradeoffs to be made when picking these non-JVM languages for your Spark code. Java offers advantages like platform independence by running inside the JVM, self-contained packaging of code and its dependencies into JAR files, and higher performance since Spark itself runs in the JVM. If you chose to use Python, users lose such advantages. In particular, managing dependencies and making them available for PySpark jobs on a cluster can be a pain. In this blog post, I will explain what your options are.
To determine what dependencies are required on the cluster ahead of time, it is important to understand where different parts of Spark code get executed and how computation is distributed on the cluster. Spark orchestrates its operations via the driver program. The driver program initializes a SparkContext, in which you define your data actions and transformations, e.g. map
, flatMap
, and filter
. When the driver program is run, the Spark framework initializes executor processes on the worker nodes that then process your data across the cluster.
Self-contained Dependency
If the Python transformations you define use any third-party libraries, like NumPy or nltk, then the Spark executors will need access to those libraries when they execute your code on the remote worker nodes. A common situation is one where we have our own custom Python package that contains functionality we would like to apply to each element of our RDD. A simple example of this is illustrated below. I assume a SparkContext
is already initialized as sc
, as in the PySpark shell.
1
2
3
4
5
6
7
8
9
|
def import_my_special_package(x):
import my.special.package
returnx
conf=SparkConf()
sc=SparkContext()
int_rdd=sc.parallelize([1,2,3,4])
int_rdd.map(lambdax:import_my_special_package(x))
int_rdd.collect()
|
After creating a SparkContext
, you create a simple rdd
of four elements and call it int_rdd
. Then you apply the function import_my_special_package
to every element of the int_rdd
. This function just imports my.special.package
and then returns the original argument passed to it. This has the same effect as using classes or functions defined in my.special.package
because Spark requires that each Spark executor can import my.special.package
when its functionality is needed.
If you only need a single file inside my.special.package
you may direct Spark to make this available to all executors by using the --py-files
option in your spark-submit
command and specifying the local path to the file. You may also specify this programmatically by using the sc.addPyFiles()
function. If you use functionality from a package that spans multiple files in it, you will be better off making an *.egg for the package, as the --py-files
flag also accepts a path to an egg file. (Caveat: if your package depends on compiled code and machines in your cluster have different CPU architectures than the code you compile your egg on, this will not work.)
In short, if you have a self-contained dependency there are a two ways that you can make required Python dependency available to your executors:
- If you only depend on a single file, you can use either the
--py-files
command line option or programmatically add them to theSparkContext
withsc.addPyFiles(path)
and specify the local path to that Python file. - If you have have a dependency on a self contained module (meaning a module with no other dependencies) you can create an egg or zip file of that module and use either the
--py-files
command line option or programmatically add them to theSparkContext
withsc.addPyFiles(path)
and specify the local path to that egg or zip file.
Complex Dependency
If the operations you want to apply in a distributed way rely on complex packages that themselves have many dependencies, you have a real challenge. Let’s take the simple snippet below as an example:
1
2
3
4
5
6
7
8
9
10
11
|
from pyspark import SparkContext,SparkConf
def import_pandas(x):
import pandas
returnx
conf=SparkConf()
sc=SparkContext()
int_rdd=sc.parallelize([1,2,3,4])
int_rdd.map(lambdax:import_pandas(x))
int_rdd.collect()
|
Again, all we are doing is importing pandas. Pandas depends on NumPy, SciPy, and many other packages. We have a problem here because we cannot make an egg that contains all of the required dependencies. Instead, we need to have the required Python environment already set up on the cluster and the Spark executors configured to use that Python environment.
Now, let’s consider our options for making these libraries available. While pandas is too complex to just distribute a *.py file that contains the functionality we need, we could theoretically create an *.egg for it and try shipping it off to executors with the --py-files
option on the command line or #addPyFiles()
on a SparkContext
. A major issue with this approach is that *.egg files for packages containing native code (which most numerically oriented Python packages do) must be compiled for the specific machine it will run on.
An assumption that anyone doing distributed computing with commodity hardware must assume is that the underlying hardware is potentially heterogeneous. A Python egg built on a client machine will be specific to the client’s CPU architecture because of the required C compilation. Distributing an egg for a complex, compiled package like NumPy, SciPy, or pandas is a brittle solution that is likely to fail on most clusters, at least eventually. This means we should prefer the alternative approach: have our required Python packages already installed on each node of the cluster and specify the path to the Python binaries for the worker nodes to use.
As long as the Python installations you want to use are in a consistent location on your cluster, you can set the PYSPARK_PYTHON
environment variable to the path to your Python executables and Spark will use those as the Python installations your executors. You can set this environment variable on a per-session basis by executing the following line of the command line:
1
|
export PYSPARK_PYTHON=/path/to/python
|
If you would like to consistently use this PYSPARK_PYTHON
definition, you can add that line to your spark-env.sh. In CDH this script is located at /etc/spark/conf/spark-env.sh
. If you set PYSPARK_PYTHON
in spark-env.sh, you should check that users have not set this environment variable already with the following lines:
1
2
3
4
5
6
|
if[-n"${PYSPARK_PYTHON}"];then
export PYSPARK_PYTHON=
<path>
fi
|
If you have complex dependencies like pandas or SciPy, you can create the required Python environment on each node of your cluster and set PYSPARK_PYTHON
to the path to the associated Python executable.
Installing and Maintaining Python Environments
Installing and maintaining Python environments on a cluster is not an easy task, but it is the best solution that allows making full use of the Python package ecosystem with PySpark. In the best possible world, you have a good relationship with your local sysadmin and they are able and willing to set up a virtualenv or install the Anaconda distribution of Python on every node of your cluster, with your required dependencies. If you are a data scientist responsible for administering your own cluster, you may need to get creative about setting up your required Python environment on your cluster. If you have sysadmin or devops support for your cluster, use it! They are professionals who know what they are doing. If you are on your own, the following, somewhat fragile, instructions may be useful to you.
If you aren’t yet worrying about long term maintainability and just need to get a Python environment set up yourself, you could take the less maintainable path of setting up virtual environments on your cluster by executing commands on each machine using Cluster SSH, Parallel SSH, or Fabric.
As an example, I provide instructions for setting up the a standard data stack (including SciPy, NumPy, scikit-learn and pandas) in a virtualenv on a CentOS 6/RHEL 6 system, assuming you have logged into every node in your cluster using cluster ssh
and each node has Python and pip
installed. (Note that you may need sudo
access in order to install packages, like LAPACK and BLAS, in the operating system.):
1
2
3
4
5
6
7
8
9
10
|
# install virtualenv:
pip install virtualenv
# create a new virtualenv:
virtualenv<mynewenv>
# Install SciPy required non-python dependencies that are not installed on CentOS by default:
yum install atlas atlas-devel lapack-devel blas-devel
pip install scipy
pip install numpy
pip install scikit-learn
pip install pandas
|
Once you have a virtualenv setup in a uniform location on each node in your cluster, you can use it as the Python executable for your Spark executors by setting the PYSPARK_PYTHON
environment variable to /path/to/mynewenv/bin/python
.
This is not particularly simple or easily maintainable. In a follow-up post I will discuss other options for creating and maintaining Python environments on a CDH cluster.
Acknowledgements
Thanks to Uri Laserson for his invaluable feedback on the blog post. Additional thanks to Sean Owen, Sandy Ryza, Mark Grover, Alex Moundalexis, and Stephanie Bodoff.
Juliet is a Data Scientist at Cloudera, and contributor/committer/maintainer for the Sparkling Pandas project. Juliet was the technical editor for Learning Spark by Karau et al. and Advanced Analytics with Spark by Ryza et al.
fuente: http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/
Social networks