Pentaho Data Integration v8: Getting started with the Spark Execution Engine

  16 Nov 2017


PDI AEL Spark Version 8 Improvements

At Pentaho World 2017 following improvements were mentioned (Source):

  • Communicates with Pentaho client tools over WebSocket; does NOT require Zookeeper
  • Uses distro-specific Spark library
  • Enhanced Kerberos impersonation on client-side

This brings a bunch of benefits:

  • Reduced number of steps to setup 
  • Enable fail-over, load-balancing
  • Robust error and status reporting 
  • Customization of Spark jobs (i.e. memory , settings)
  • Client to AEL connection can be secured
  • Kerberos impersonation from client tool 

Other improvements:

  • Big Data File Formats - Avro and Parquet steps: When you run in AEL, these will also be natively interpreted by the engine, which adds a lot to the value of this.

Other notes:

  • There is still no improvement on the Group By step. Basically, everything still has to go via a single node (Spark client) - there is no proper Spark support to run it in parallel.
  • There is no support in AEL Spark Streaming for event time in a windowing function. Windowing can be achieved (as highlighted in Pedro’s blog) by using a sub-transformation (you can define the batch interval in the job entry settings) and then you can do the aggregation in the sub-transformation based on processing time (not event, not ingestion time).
  • Spark client: Is there still a limitation to run it on the edgenode? Yes.

Official Documentation:

Setup for v8

Spark Client

If not already installed on your local machine, download the Spark Client from here.

The environment variable SPARK_HOME will point to the Spark Client folder.

PDI

PDI Download:

  • PDI-CE v8.0 from here.

Unzip it and move the folder to a convenient location. Make the shell files executable.

Configuration

Local Setup for Development

The exciting news in PDI v8 is that Pentaho took all the complexity out of the local setup! It is really easy now to get going with PDI and Spark!

AEL Daemon: The Adaptive Execution Layer is part of the PDI client now! You can find it in the adative-execution folder within <pdi-client-root>. The environment variable PDI_AEL_DAEMON_HOME will point not to this folder but the parent folder, which is <pdi-client-root>.

Adjust the configuration of the AEL Daemon by adjusting this file:

data-integration/adaptive-execution/config/application.properties

Change the following properties:

property value environment variable
sparkHome path your local Spark client SPARK_HOME
sparkApp path to your AEL Daemon data-integration folder PDI_AEL_DAEMON_HOME
hadoopConfDir path to directory containing all the Hadoop config details (*-site.xml) HADOOP_CONF_DIR

Here is my example configuration with hard-coded values (import bits shown only):

# Hadoop Configuration
# ----------------------------------------------------------------------------------------------------
# Directory where *-site.xml files reside for Hadoop Configuration
hadoopConfDir=/home/dsteiner/apps/hadoop-2.8.0/etc/hadoop

# Hadoop User which will run job
hadoopUser=dsteiner

# Spark Configuration
# ----------------------------------------------------------------------------------------------------
# Location of Apache Spark Client distribution\
sparkHome=/home/dsteiner/apps/spark-2.1.0-bin-hadoop2.7

# Spark Master
# Where spark will run: local / yarn
# Note, you can also simulate multiple executors by using this notation (i.e. 2 executors):  local[2]
sparkMaster=local[2]

# Deploy Mode
# This property is only used when the master is set to `yarn`.
# client  - Driver will run on the daemon machine; but the executors will run in Yarn
# cluster - Entire application will be run in cluster (Not supported yet)
sparkDeployMode=client

# AEL Spark Properties
# ----------------------------------------------------------------------------------------------------
# Main PDI Driver location (can be pdi-ee-client or pdi-spark-driver)
sparkApp=/home/dsteiner/apps/pdi-ce-8.0

Alternatively, if you set the environment variables in bashrc (in example):

export HADOOP_CONF_HOME=<your-value>
export SPARK_HOME=<your-value>
export PDI_AEL_DAEMON_HOME=<your-value>

You can reference them inside the config like so:

# Hadoop Configuration
# ----------------------------------------------------------------------------------------------------
# Directory where *-site.xml files reside for Hadoop Configuration
hadoopConfDir=${HADOOP_CONF_DIR}

# Hadoop User which will run job
hadoopUser=dsteiner

# Spark Configuration
# ----------------------------------------------------------------------------------------------------
# Location of Apache Spark Client distribution\
sparkHome=${SPARK_HOME}

# Spark Master
# Where spark will run: local / yarn
# Note, you can also simulate multiple executors by using this notation (i.e. 2 executors):  local[2]
sparkMaster=local[2]

# Deploy Mode
# This property is only used when the master is set to `yarn`.
# client  - Driver will run on the daemon machine; but the executors will run in Yarn
# cluster - Entire application will be run in cluster (Not supported yet)
sparkDeployMode=client

# AEL Spark Properties
# ----------------------------------------------------------------------------------------------------
# Main PDI Driver location (can be pdi-ee-client or pdi-spark-driver)
sparkApp=${PDI_AEL_DAEMON_HOME}

Next create a directory called kettleConf inside your Spark Client’s root directory:

cd $SPARK_HOME
mkdir kettleConf

Note: If the kettleConf folder already exists and has a few files in it, it is probably best to remove these files.

Note: The Spark Client will be called by PDI, so you do not have to start it independently. Nothing to do. Simple.

Next start the AEL Daemon like so:

cd $PDI_AEL_DAEMON_HOME
# run as foreground process - great for debugging
sh ./daemon.sh
# or start as a background process
sh ./daemon.sh start
# check the status if in background mode
./daemon.sh status

Create Run Configuration

Now that the AEL Daemon is running, we can create a Run Configuration in Spoon. You can think of this as configuring just another engine to run your jobs or transformations with.

Setting up a Run Configuration in Spoon is very easy:

  1. Go to the View tab.
  2. Right click on Run configurations and choose New.
  3. In the Run configuration dialog provide a name and description. As Engine choose Spark. The default connection entries should be just fine for your local setup (http://127.0.0.1:53000).

Note: You might wonder what port 53000 relates to? This is the port defined for the AEL Deamon in adaptive-execution/config/application.properties.

Run Configuration Dialog

When you execute your job or transformation next time, pick this Run Configuration.

FAQ

  • Q: Does the Spark client or server have to be running for PDI AEL to work? A: No. PDI AEL will start the Spark Client by itself.
  • Q: Is it necessary to have Spark set up to connect to HDFS? A: No. If you want to run Hadoop specific steps/job entries, sparkMaster must be set to yarn. You can simulate this locally by running a pseudo-distributed Hadoop cluster. If you want to do this, configure the AEL Daemon as shown in the next section and also upload pdi-spark-executor.zip` to HDFS.
  • Q: Is it necessary to have HDFS running? A: No, unless you changed the AEL Daemon config so that everything runs against YARN.

Spark Logging

One of the great things about AEL is that the essential logs are available straight in Spoon/PDI. You remember Pentaho Map/Reduce? Yes - then you might remember that you actually had to get the logs from YARN to understand what was going on.

While AEL has you covered on the logging side, you might still be extremely curious and want to see the full Spark logs. Here is the pro-tip from Matt Campell:

For spark logging, the trick is to first set

overwriteConfig=false

in the daemon’s config/application.properties file.

Important: Make sure you restart the deamon after changing the config.

This will allow the $SPARK_HOME/kettleConf/spark-defaults.conf file to persist across executions. You can then set spark logging (and any other spark props of interest):

spark.eventLog.enabled=true
spark.eventLog.dir=/tmp/spark.log

Running on the Cluster (YARN Mode)

Note: There is no dependency any more on Zookeeper in v8.

This is configuration for the production environment. You can also use it locally if you are running a pseudo-distributed cluster.

Spark Application

Since some users extend the PDI core functionality by additional steps (plugins) e.g. from the Marketplace, not every PDI setup is the same. We will have to make the essential libraries of our possibly customised PDI setup available to the Spark cluster via building an zip file which bundles any dependencies (libraries). If you’ve every written a plain Scala or Java Scala app and then created an uber jar, this will all sound quite familiar to you.

PDI comes with the Spark Application Builder Tool, which we will use to create this zip file (called pdi-spark-driver.zip). To be precise, the zip file has two essential contents:

  • data-integration: This looks pretty much like your local PDI client installation
  • pdi-spark-executor.zip: bundles all the required PDI libraries, so that it can be distributed on the Spark cluster.

The idea is that you copy pdi-spark-driver.zip to an edgenode on your cluster and extract it there.

If you are just planning to do local development, you can unzip pdi-spark-driver.zip locally as well.

And this how your create the zip file on Linux:

Note: Currently there is still a bug open that the Spark App Builder script expects the parent folder to be called data-integration. When you unzip the downloaded PDI zip file, by default the extracted folder will be called data-integration, however, when you have more than one version on your system, then some people tend to rename this folder. If you are one of these people, you will have to rename the folder temporarily.

# if you renamed the PDI root folder, change it back to it's original name temporarily
mv pdi-ce-8.0 data-integration
cd data-integration
sh ./spark-app-builder.sh -outputLocation /tmp
cd ..
mv data-integration pdi-ce-8.0

Configuration

If you intend to use with Spark and YARN on the cluster, the following additional steps are necessary:

Add following variables to bashrc (in example):

export HADOOP_CONF_HOME=<your-value>
export SPARK_HOME=<your-value>
export PDI_AEL_DAEMON_HOME=<your-value>
export HDFS_SPARK_EXECUTOR_LOCATION=<your-value>

Copy pdi-spark-driver.zip to the edge node and unzip the file. As mentioned before, you will find a data-integration folder and the pdi-spark-executor.zip inside the extracted folder.

Adjust the configuration of the AEL Daemon by adjusting this file:

data-integration/adaptive-execution/config/application.properties

Change the following properties:

property value environment variable
sparkHome path your local Spark client SPARK_HOME
sparkApp path to your AEL Daemon data-integration folder PDI_AEL_DAEMON_HOME
hadoopConfDir path to directory containing all the Hadoop config details (*-site.xml) HADOOP_CONF_DIR
hadoopUser user id which submits the PDI Spark job. Only applicable if you are not using security  
sparkMaster yarn  
assemblyZip hdfs:$HDFS_SPARK_EXECUTOR_LOCATION HDFS_SPARK_EXECUTOR_LOCATION

My configuration looked like this (partially shown only, local setup with a pseudo distributed cluster):

# Hadoop Configuration
# ----------------------------------------------------------------------------------------------------
# Directory where *-site.xml files reside for Hadoop Configuration
#hadoopConfDir=${HADOOP_CONF_DIR}
hadoopConfDir=/home/dsteiner/apps/hadoop-2.8.0/etc/hadoop

# Hadoop User which will run job
hadoopUser=dsteiner

# Spark Configuration
# ----------------------------------------------------------------------------------------------------
# Location of Apache Spark Client distribution\
#sparkHome=${SPARK_HOME}
sparkHome=/home/dsteiner/apps/spark-2.1.0-bin-hadoop2.7-yarn
#sparkHome=/home/dsteiner/apps/spark-2.2.0-bin-hadoop2.7-standalone

# Spark Master
# Where spark will run: local / yarn
# Note, you can also simulate multiple executors by using this notation (i.e. 2 executors):  local[2]
#sparkMaster=local[2]
sparkMaster=yarn
# Deploy Mode
# This property is only used when the master is set to `yarn`.
# client  - Driver will run on the daemon machine; but the executors will run in Yarn
# cluster - Entire application will be run in cluster (Not supported yet)
sparkDeployMode=client

# AEL Spark Properties
# ----------------------------------------------------------------------------------------------------
# Main PDI Driver location (can be pdi-ee-client or pdi-spark-driver)
#sparkApp=${PDI_AEL_DAEMON_HOME}
sparkApp=/home/dsteiner/apps/pdi-ce-8.0

sparkAppClass=org.pentaho.pdi.spark.driver.SparkWebSocketMain

# Spark Assembly Zip
# This is the zip file of the PDI Assembly that will be used on the executors.
# It is recommended to leverage an HDFSreference which will help with performance launching the application.
assemblyZip=hdfs:/opt/pentaho/pdi-spark-executor.zip

Next create a directory called kettleConf inside your Spark Client’s root directory:

cd $SPARK_HOME
mkdir kettleConf

Copy pdi-spark-executor.zip to HDFS and extract the contents. This location is referred to as HDFS_SPARK_EXECUTOR_LOCATION.

cd /tmp
unzip pdi-spark-driver.zip
hdfs dfs -put ./pdi-spark-executor.zip /opt/pentaho

Note: If you’ve previously uploaded a PDI Spark Executor to HDFS, make sure you remove it first.

Next start the AEL Daemon like so:

cd $PDI_AEL_DAEMON_HOME
sh ./daemon.sh
# or start as a background process
sh ./daemon.sh start
# check the status if in background mode
./daemon.sh status

YARN Resource Manager UI and YARN Command Line Client

When you run your PDI jobs via AEL Spark, you have at least 3 ways to check what’s going on:

  • PDI logs: Utterly convient, PDI logs will show you the work that is going on on the Spark cluster as well.
  • Yarn Resource Manager Web UI: Head over to http://localhost:8088/cluster (or similar) and check what’s going on (you can also get hold of the logs here):

  • Yarn Command Line Client:
$ yarn application -list -appStates ALL
17/11/22 22:30:06 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8032
Total number of applications (application-types: [] and states: [NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED]):1
                Application-Id	    Application-Name	    Application-Type	      User	     Queue	             State	       Final-State	       Progress                       Tracking-URL
application_1511385464282_0001	org.pentaho.pdi.spark.driver.SparkWebSocketMain	               SPARK	  dsteiner	   default	          FINISHED	         UNDEFINED	           100%	                                N/A

Then you can get the log by specifying the application id that the previous command returned:

yarn logs -applicationId application_1511385464282_0001 | less

Securing AEL Spark

Take a look at the info provided here

Steps that do not run in parallel

A list of EE supported steps which cannot be run in parallel and hence have to be executed on a single node can be found in:

<pdi-root>system/karaf/etc/org.pentaho.pdi.engine.spark.cfg

All the data will be moved to this single node (the Spark client) and the process will be executed in a single thread. Since the Spark Client can currently only run outside the cluster on an edgenode, and for some steps this means that all the data has to leave the cluster and travel to the edgenode for processing and then possibly back again to the cluster.

You can extend this list with any other plugins that you use, which cannot run in parallel.

  • Abort
  • AccessInput
  • AccessOutput
  • AutoDoc
  • BlockingStep
  • BlockUntilStepsFinish
  • ClosureGenerator
  • ConcatFields
  • CsvInput
  • CubeInput
  • CubeOutput
  • DataGrid
  • Delete
  • Denormaliser
  • DetectLastRow
  • ExcelInput
  • ExcelOutput
  • FieldsChangeSequence
  • FixedInput
  • Flattener
  • Flatterner
  • GetFileNames
  • GetFilesRowsCount
  • GetRepositoryNames
  • GetSubFolders
  • GetTableNames
  • getXMLData
  • GroupBy
  • HL7Input
  • InsertUpdate
  • JmsInput
  • JmsOutput
  • JsonInput
  • JsonOutput
  • LDAPInput
  • LDAPOutput
  • LDIFInput
  • LoadFileInput
  • MailInput
  • MemoryGroupBy
  • MondrianInput
  • MQInput
  • MQOutput
  • OlapInput
  • ParallelGzipCsvInput
  • PentahoReportingOutput
  • PropertyInput
  • PropertyOutput
  • RandomCCNumberGenerator
  • RandomValue
  • RowGenerator
  • RssInput
  • RssOutput
  • S3CSVINPUT
  • S3FileOutputPlugin
  • SalesforceDelete
  • SalesforceInput
  • SalesforceInsert
  • SalesforceUpsert
  • SAPINPUT
  • SASInput
  • Sequence
  • ShapeFileReader
  • SQLFileOutput
  • SynchronizeAfterMerge
  • SystemInfo
  • TableInput
  • TableOutput
  • TypeExitExcelWriterStep
  • TypeExitGoogleAnalyticsInputStep
  • Unique
  • UniqueRowsByHashSet
  • Update
  • XBaseInput
  • XMLInputStream
  • XMLOutput
  • YamlInput

Jira Cases Filed

Update: PDI v8.1

  • Group By step runs on Spark natively now.
  • Add any Spark properties directly to the application.properties file or transformation parameters to make this easier.
comments powered by Disqus