Pentaho Data Integration: Getting started with the Spark Execution Engine

  22 May 2017


The release of PDI 7.1 is quite a historic moment. We finally have Spark Batch support, but also a feature called ** Execution Layer** (AEL). AEL is an abstraction layer, meaning you design a DI transformation once and can run it against various engines (Spark, Pentaho and more to come). It’s a similar concept to Apache Beam. This is really amazing! Remember when Pentaho released MapReduce support? You had to design your transformation in a special way to fit into the M/R paradigm. The AEL takes away this complexity! I recommend reading Hemal’s blog post on PDI Spark, which provides an excellent overview.

Official Documentation:

Initial Setup

PDI

PDI Download:

  • PDI-CE v7.1 from here.

Unzip it and move the folder to a convenient location. Make the shell files executable.

Hadoop and Spark

You will also need to have a Hadoop and Spark environment running somewhere

Either: Option 1:

Use a VM or Docker. VMs or Docker image is the easiest way to go, just source them directly from Cloudera, Hortenworks, MapR etc.

Or: Option 2:

Use a minimum local installation as described in my Setting up a Hadoop Dev Environment for Pentaho Data Integration.

If you chose Option 1 above, download the Spark Client from here. PDI AEL depends on this Spark client. Extract the file.

If you went for Option 2, the Spark Client will already be part of your setup.

The environment variable SPARK_HOME will point to the Spark Client folder.

Configure Big Data Shim

This shouldn’t really be necessary when using Spark only since all the communication goes via the Spark Client. However, if you want to use any of the Hadoop steps within PDI, configuring the shim will definitely be required.

Next configure the Big Data Shim, an abstraction layer which enables PDI to work with your Hadoop distro. Details on how to do this are available in various articles online (it’s also covered in Setting up a Hadoop Dev Environment for Pentaho Data Integration).

Start spoon.sh to confirm that the basis setup is working.

Configuring Spark engine for the Adaptive Execution Layer

Setting up the Adaptive Execution Layer (AEL):

In production, the AEL daemon has to be installed on an edge node. The AEL deamon is part of the PDI client. The PDI client connects to the ZooKeeper server on the cluster, which establishes the communicate between the Spark cluster and the AEL daemon. Jobs and transformations are launch via the AEL daemon and run on the Spark cluster. For production, you will need to disable the Pentaho embedded ZooKeeper which ships with PDI and set up AEL to use the ZooKeeper server on the cluster

“The Pentaho Spark application [Driver] is a subset of PDI used to run PDI transformations with the Spark engine in YARN.”

Please find below the overview of a production setup:

component module environment variable prod location description
PDI AEL Deamon PDI_AEL_DAEMON_HOME edgenode subset of PDI files required to run with Spark, contained with the PDI Spark Application/Driver data-integration folder, which has to be deployed onto an edgenode
PDI AEL Spark executor PDI_SPARK_EXECUTOR_LOCATION HDFS includes only the required libraries needed by the Spark nodes themselves to execute a transformation
Pentaho Server AEL Deamon PDI_AEL_DAEMON_HOME edgenode See above

There are various ways to deploy it:

  • Production Environment: PDI will be on one edgenode, the AEL Daemon on another edgenode, the AEL Spark Executor libs on HDFS and finally the Pentaho Server on one edgenode.
  • Dev Environment: Either use a VM or Docker image(s). Alternatively install all the modules on your machine. No need to run Spark in standalone mode. The AEL Daemon can be set up to access Spark in local mode.

Pentaho Server AEL Configuration

This step is not required for our local dev environment setup and only here to explain how you would set up the Pentaho Server on a separate machine with AEL support:

Pentaho Server 7.1 includes the AEL daemon. If you are running it on a separate machine, download it from here and extract it in a convenient location. Make all shell files in all directories executable. Start the server by running:

sh start-pentaho.sh

Visit the Pentaho web interface on the following URL to make sure everything is working:

http://localhost:8080/pentaho/Login

It might take some time for the page to load initially.

Next add the environment variable PDI_AEL_DAEMON_HOME to your bashrc or bash_profile and point it to data-integration/adaptive-execution. Adjust below as required:

export PENTAHO_SERVER_HOME=/home/dsteiner/apps/pentaho-server-ce-7.1
export PDI_AEL_DAEMON_HOME=$PENTAHO_SERVER_HOME/data-integration/adaptive-execution

Don’t forget to source .bashrc/.bash_profile.

If you don’t have Spark installed, download spark-2.1.0-bin-hadoop2.7 from here. Just extract it in a convenient location and add SPARK_HOME to your .bashrc or .bash_profile and point it to the Spark root directory.

Create PDI Spark App/Driver

Next we have to bundle all dependencies to run PDI jobs and transformations within Spark (see also official docu.

PDI ships with a command line tool called spark-app-builder.sh (find it in the PDI root dir). You can exclude plugins by specifying the -e flag and change the output location from the PDI root directory to a custom one by using the -o flag:

export WORK_DIR=/home/dsteiner/apps/
sh spark-app-builder.sh -o $WORK_DIR

Important: For some weird reason the folder containing the PDI installation must be named data-integration. You cannot change it currently to something else since you will get an error when running this script. I created this JIRA case.

This script will create the pdi-spark-driver.zip file, which contains a data-integration folder and pdi-spark-executor.zip file.

Deploy the PDI Spark App/Driver

In a production environment, using the extracted files from the pdi-spark-driver.zip:

  • you have to copy the data-integration folder to the edge node where you want to run the AEL daemon. Create an environment variable called PDI_AEL_DAEMON_HOME to point to this location (see for detail further down this article).
  • If you are planning to run Spark on Yarn, copy the pdi-spark-executor.zip file to the HDFS where you will run Spark and extract the contents.

In my case I have a local dev environment with Hadoop and Spark natively installed (so no VMs or Docker images) and I configured PDI like so:

I extracted pdi-spark-driver.zip in a convenient location and added the PDI_AEL_DAEMON_HOME environment variable to .bashrc.

cd $WORK_DIR
unzip pdi-spark-driver.zip -d pdi-ce-7.1-spark
cd pdi-ce-7.1-spark
# next two steps only require if running via Yarn
hdfs dfs -mkdir -p /opt/pentaho/spark
hdfs dfs -put pdi-spark-executor.zip /opt/pentaho/spark

Then I added this to .bashrc or .bash_profile (amend):

export PDI_SPARK_HOME=/home/dsteiner/apps/pdi-ce-7.1-spark
export PDI_SPARK_APP_HOME=$PDI_SPARK_HOME/data-integration
export PDI_AEL_DAEMON_HOME=$PDI_SPARK_APP_HOME/adaptive-execution
# export PDI_SPARK_EXECUTOR_LOCATION=hdfs:/opt/pentaho/spark/pdi-spark-executor.zip
export PDI_SPARK_EXECUTOR_LOCATION=$PDI_SPARK_HOME/pdi-spark-executor.zip

Source the file.

Configuring the PDI AEL Daemon

“The setenv script runs each time the AEL daemon is started and sets the values for the environment properties. In the setenv file, the SPARK_HOME and the SPARK_APP variable values must be manually entered.”

Important: You can either manually change the config in setenv file or use the pdi-deamon script to change the settings for the etc/org.pentaho.pdi.engine.daemon.cfg file. The command line utility (script) is only available on Unix-based OSes (but you can just change etc/org.pentaho.pdi.engine.daemon.cfg manually as well).

The property values to use for the custom configuration of the daemon can exist as either environmental variables or as shell variables. You can set an environment variable as a property value using the ./pdi-daemon config command.

Manually changing the config file

We have to set the SPARK_HOME and SPARK_APP variables in the setenv script in case they are not already defined system wide (you should really have them define system wide, e.g. in bashrc, in which case you do not have to do this step here):

cd $PDI_AEL_DAEMON_HOME
vi setenv

For my local setup I only changed the below shown properties. Everything was left commented.

SPARK_HOME="$SPARK_HOME"
SPARK_APP="$PDI_SPARK_APP_HOME"

Important: Make sure you enclose the values in double quotes in setenv!

Note: You can run ./pdi-daemon config --quiet to write the settings from setenv to etc/org.pentaho.pdi.engine.daemon.cfg. You can then run ./pdi-daemon config --daemon --list to confirm that the settings were applied to the AEL daemon (or alternatively open etc/org.pentaho.pdi.engine.daemon.cfg in a text editor). Normally there is no need to execute ./pdi-daemon config --quiet, since ./pdi-daemon start will automatically do this for you. So to summarise the role of setenv and etc/org.pentaho.pdi.engine.daemon.cfg: The latter one is the configuration used by the AEL daemon. You can use setenv to define variables that are automatically propagated on startup to etc/org.pentaho.pdi.engine.daemon.cfg.

Using the command line utility

Again, only use the command line utility if you haven’t changed the setenv file manually (It’s one or the other).

The pdi-daemon located in $PDI_AEL_DAEMON_HOME/adaptive-execution allows you to change the configuration properties for the AEL Deamon.

You can also disable the Pentaho Zookeeper, which is shipped with AEL and instead point to your cluster’s Zookeeper.

cd $PDI_AEL_DAEMON_HOME
# see current AEL daemon configuration
./pdi-daemon config --daemon --list
# see current Zookeeper configuration
./pdi-daemon config --zookeeper server --list
# change property value
./pdi-daemon config --daemon <property name> <property value>
# enable/disable zookeeper
./pdi-daemon config --zookeeper <client|server [enable|disable]>

I made the following changes for my local setup:

$ ./pdi-daemon config --daemon --list
sparkHome=/home/cloudera/spark-2.1.0-bin-hadoop2.7/
sparkApp=/home/cloudera/pdi-ee-client-7.1-SNAPSHOT/data-integration
assemblyZip=hdfs:/opt/pentaho/pdi-ee-client-7.1-SNAPSHOT.zip
driverDebugPort=-1
executorDebugPort=-1
suspendDebug=true
sparkDriverExtraJavaOptions=-Dlog4j.configuration=file:${sparkApp}/classes/log4j.xml
sparkDriverMemory=4g
sparkExecutorMemory=1g
driverTimeoutMillis=240000
hadoopConfDir=/home/cloudera/yarn-conf
hadoopUser=devuser
sparkMaster=local
sparkDeployMode=client
keytabName=devuser.keytab
kerberosPrincipal=devuser
disableProxyUser=false
overwriteConfig=true
$ ./pdi-daemon config --daemon sparkHome $SPARK_HOME
$ ./pdi-daemon config --daemon sparkApp $PDI_SPARK_APP_HOME
$ ./pdi-daemon config --daemon assemblyZip $PDI_SPARK_EXECUTOR_LOCATION
$ ./pdi-daemon config --daemon hadoopConfDir $HADOOP_CONF_DIR
$ ./pdi-daemon config --daemon hadoopUser $USER
$ ./pdi-daemon config --daemon --list
sparkHome=/home/dsteiner/apps/spark-2.1.0-bin-hadoop2.7
sparkApp=/home/dsteiner/apps/pdi-ce-7.1-spark-app
assemblyZip=/home/dsteiner/apps/pdi-ce-7.1-spark-app/pdi-spark-executor.zip
driverDebugPort=-1
executorDebugPort=-1
suspendDebug=true
sparkDriverExtraJavaOptions=-Dlog4j.configuration=file:${sparkApp}/classes/log4j.xml
sparkDriverMemory=4g
sparkExecutorMemory=1g
driverTimeoutMillis=240000
hadoopConfDir=/home/dsteiner/apps/hadoop-2.8.0/etc/hadoop
hadoopUser=dsteiner
sparkMaster=local
sparkDeployMode=client
keytabName=devuser.keytab
kerberosPrincipal=devuser
disableProxyUser=false
overwriteConfig=true

IMPORTANT: These config command amends the etc/org.pentaho.pdi.engine.daemon.cfg!

FACTORY RESET: If you ever get yourself into a tricky situation and want to restore the config, you can always reset the config by running ./pdi-daemon config --reset

The official docu has also a dedicated section on running AEL Daemon with your own Zookeeper Server - see for more details there.

If you want to change the settings for the AEL embedded Zookeeper Server, directly edit the properties in the Embedded Zookeeper configuration properties section of the setenv file (or alternatively use the pdi-daemon config utility mentioned above).

Running in YARN

There is an additional configuration to run PDI Spark in YARN:

cd $PDI_AEL_DAEMON_HOME
vi etc/org.pentaho.pdi.engine.daemon.cfg

Note: If you are on Linux, you can use the command line utility pdi-daemon as well to change the settings!

By default the Spark is assumed to run in local mode (sparkMaster=local, I used this latter setup for my local dev env).

Change the following property values to match this:

hadoopUser=dsteiner
sparkMaster=yarn
sparkDeployMode=client
assemblyZip=hdfs:/opt/pentaho/spark/pdi-spark-executor.zip

Important: sparkDeployMode server is not yet supported (as stated in the config file).

Note: For my local dev environment I kept sparkMaster set to local.

Note: For my local dev env I set hadoopConfDir to my local Hadoop install director config. For other environments, you must copy the clusters *site.xml files to a convenient location on your machine (chances are that you have already done this when setting up the PDI Hadoop Shim any ways) and point to this location.

There are a lot more config settings, please see official docu.

Properties explained

Below is a overview of some common properties in setenv and etc/org.pentaho.pdi.engine.daemon.cfg.

  • SPARK_HOME, sparkHome: should point to the root directory of your Spark installation
  • SPARK_APP, sparkApp: should point to the data-integration directory generated by spark-app-builder.sh (the unzip version of it).
  • SPARK_MASTER, sparkMaster: local runs one executer, local[x] runs x executors, specify yarn when you want to run the Spark process on YARN (who guessed?!). If you define local mode, you do not have to have a running local standalone Spark service.
  • SPARK_DEPLOY_MODE, sparkDeployMode: This property is only used when the master is set to yarn:
    • client - Driver will run on the daemon machine; but the executors will run in Yarn
    • cluster - Entire application will be run in cluster (Not supported yet)
  • HADOOP_CONF_DIR, hadoopConfDir: Points to a directory with the clusters -site.xml config files. (can be a local copy of these config files) -HADOOP_USER, hadoopUser`: The user name you want to use to access or write files on HDFS.
  • ASSEMBLY_ZIP, assmeblyZip: Location of the PDI Spark executor zip file. This can either be on the local file system or on HDFS when deployed on the cluster.

Starting the PDI AEL Daemon

Here are the essential commands:

cd $PDI_AEL_DAEMON_HOME
# start daemon
./pdi-daemon start
# start daemon in the foreground / interactively
./pdi-daemon start -i
# stop daemon
./pdi-daemon stop
# show status of daemon
./pdi-daemon status

If you are using the AEL embedded Zookeeper Server it is worth checking that it is available on the specified port:

sudo netstat -anp | grep 2181

Note: If you want to exit interactive mode, press CTRL+C. This will bring you to the Karaf Shell. Type system:shutdown or shell:logout to exit completely.

Checking Setup of Dev Environment

So far we have created following environment variables in .bashrc (example, adjust paths as required):

export PDI_SPARK_HOME=/home/dsteiner/apps/pdi-ce-7.1-spark
export PDI_SPARK_APP_HOME=$PDI_SPARK_HOME/data-integration
export PDI_AEL_DAEMON_HOME=$PDI_SPARK_APP_HOME/adaptive-execution
# export PDI_SPARK_EXECUTOR_LOCATION=hdfs://opt/pentaho/spark/pdi-spark-executor.zip
export PDI_SPARK_EXECUTOR_LOCATION=$PDI_SPARK_HOME/pdi-spark-executor.zip

We have following components installed:

Module Path
PDI /user/dsteiner/home/apps/data-integration
PDI Spark App /home/dsteiner/apps/pdi-ce-7.1-spark/data-integration
PDI AEL Daemon /home/dsteiner/apps/pdi-ce-7.1-spark/data-integration/adaptive-execution
Spark executors /home/dsteiner/apps/pdi-ce-7.1-spark/pdi-spark-executor.zip

We have following services running:

  • Hadoop
  • PDI AEL Daemon
  • PDI Spoon

Important: There is no need to start any Spark service! PDI will do this for you (start the Spark client)!

Create Spark Run Configuration in Spoon

Go to the View tab and right click on Run configurations to set up a new Spark connection (see the official docu for details). You can set/change various properties (see here for more details).

Run Configuration Spark: Note that the Spark Host URL is not the web interface URL but the Zookeeper URL. These URL depends on how you configured the AEL Daemon: This can either be the AEL Deamon embedded Zookeeper Server or the Hadoop cluster Zookeeper Server.

If you use the AEL Deamon embedded Zookeeper Server, you can see the port by running following command (If you didn’t change it, the default settings in PDI will work):

cd $PDI_AEL_DAEMON_HOME
# see current Zookeeper configuration
vi setenv
# take a look at the embedded Zookeeper section

When you execute the transformation, pick the new Spark run configuration you just created. In my local dev environment it took quite some time for the transformation to run.

Monitoring PDI Spark Job

Spark Web UI:

I couldn’t find the job there.

Yarn: If executed in Yarn mode, you should find logs there.

You can also find details in the AEL Daemon log:

17/05/23 22:53:11 INFO Client: Uploading resource hdfs://opt/pentaho/spark/pdi-spark-executor.zip -> file:/home/dsteiner/.sparkStaging/application_1495574114174_0003/pdi-spark-executor.zip

Troubleshooting AEL

The official docu has a section on this topic.

Some other points:

It’s worth to start the AEL in interactive mode and watch the log. Some errors are not shown with the usual ERROR indicator, e.g. see the message below:

2017-05-23 22:45:52,527 | INFO  | launcher-proc-2  | SparkMain                        | 57 - wrap_file__home_dsteiner_apps_pdi-ce-7.1-spark-app_data-integration_adaptive-execution_system_org_apache_spark_spark-launcher_2.11_2.1.0_spark-launcher_2.11-2.1.0.jar - 0.0.0 | Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no host: hdfs:/$HDFS_SPARK_EXECUTOR_LOCATION/pdi-spark-executor.zip

Well, it looks like in this case the env variable couldn’t be replaced, so I had to hard code the value in the config.

Next Problem:

Other Spark Config settings you might have, like Kyro libs, they might interfere: Check $SPARK_HOME/conf/spark-defaults.conf

Next Problem

19 - org.apache.hadoop.zookeeper - 3.4.7 | Got user-level KeeperException when processing sessionid:0x15c39a3f4e70000 type:create cxid:0x54 zxid:0x2c7 txntype:-1 reqpath:n/a Error Path:/osgi/service_registry/org/pentaho/di/engine/api/remote Error:KeeperErrorCode = NodeExists for /osgi/service_registry/org/pentaho/di/engine/api/remote

AEL Daemon is already running, shut it down properly.

Next Problem:

$ ./pdi-daemon start -ipdi-daemon: SPARK_APP not set; results may vary

Immediately after starting the AEL Daemon, the above error message shows up. Make sure you set SPARK_APP in the setenv script.

To Parallelise Or Not To Parallelise

An interesting note that Hemal drops in his blog post is that the decision if a step can run in parallel or not on the Spark cluster is defined in:

$PDI_AEL_DAEMON_HOME/system/karaf/etc/org.pentaho.pdi.engine.spark.cfg

The forceCoalesceSteps property initially contains a list of EE supported steps which cannot be run in parallel and hence have to be executed on a single node. All the data will be moved to this single node and the process will be executed in a single thread. You can extend this list with any other plugins that you use, which cannot run in parallel.

When you look through the list you will realise that the GroupBy step is currently not supported in parallel mode.

Other important points

  • Run configurations are associated with the Transformation job entry, so you cannot just ran a standard Transformation on AEL Spark - you always require a job to run it.

Open Questions

[OPEN] In SPARK_HOME PDI creates:

spark-2.1.0-bin-hadoop2.7/kettleConf/spark-defaults.conf

What properties are there?

Is this used instead of $SPARK_HOME/conf/spark-defaults.conf?

Is if YARN is the only supported resource negotiator? Yes

PDI Version 8 Improvements

At Pentaho World 2017 following improvements were mentioned (Source):

  • Communicates with Pentaho client tools over WebSocket; does NOT require Zookeeper
  • Uses distro-specific Spark library
  • Enhanced Kerberos impersonation on client-side

This brings a bunch of benefits:

  • Reduced number of steps to setup 
  • Enable fail-over, load-balancing
  • Robust error and status reporting 
  • Customization of Spark jobs (i.e. memory , settings)
  • Client to AEL connection can be secured
  • Kerberos impersonation from client tool 

Other improvements:

  • Big Data File Formats - Avro and Parquet steps: When you run in AEL, these will also be natively interpreted by the engine, which adds a lot to the value of this.

Other notes:

  • There is still no improvement on the Group By step. Basically, everything still has to go via a single node (Spark client) - there is no proper Spark support to run it in parallel.
  • There is no support in AEL Spark Streaming for event time in a windowing function. Windowing can be achieved (as highlighted in Pedro’s blog) by using a sub-transformation (you can define the batch interval in the job entry settings) and then you can do the aggregation in the sub-transformation based on processing time (not event, not ingestion time).
  • Spark client: Is there still a limitation to run it on the edgenode? Yes.
comments powered by Disqus