Self-contained layered data pipelines within Spark and python using luigi
Using pipeline.py
to submit your spark app, you can have traditional luigi task trees run all within a pyspark context and handle resulting task statuses in yarn logs.
bin/install
- Tested against Hortonworks Hadoop 2.6.0, Python 3.5, Spark 2.0.1, CentOS
Hadoop 2.6.0.2.2.0.0-2041
Subversion [email protected]:hortonworks/hadoop.git -r 7d56f02902b436d46efba030651a2fbe7c1cf1e9
- requires standard-lib python2 on Linux for install
cd ~/
git clone this-repo
- fill out your
luigi.cfg
. Example in this repo
python setup.py install
-
follow the command prompts to accept the license and default install path
-
answer yes to
Do you wish the installer to prepend the Anaconda3 install location to PATH in your /home/user.../.bashrc ?
-
To place your
luigi.cfg
into production use on hdfs, do
hadoop fs -put $DEFAULT_LUIGI_CONFIG_PATH_LOCAL $DEFAULT_LUIGI_CONFIG_PATH_HDFS
- start the luigi deamon task monitor
luigid --background --pidfile ../pidfile --logdir ../logs --state-path ../statefile
-
then monitor your tasks on
host:8082
-
to install this repo as a package and overwrite an existing installation do the following (needs to be run when deploying updates to this repo)
python setup.py install
bin/beeline
: connects to hive server via beeline to run interactive queriesbin/install
: installs this package and sets everything upbin/luigid
: starts luigi daemon processbin/prod
: sets up this repo ready for production usagebin/pyenv
: recreates the required python envbin/pyspark
: starts the pyspark shell for interactive usage/debuggingbin/pysparksql
: executes arbitary sql using spark or hive contexts (no results returned)bin/workflow
: executes the user input'ed workflow/taskbin/kill
: kills apps using -s SEARCHSTRING -t STATE -q QUEUEbin/kill_app_after_x_min
: kills all apps running for more than x minutesbin/search_log
: searches yarn logs for the given search string and returns the given logtypebin/search_app
: searches running apps for the given search string and returns the app's status
- Example:
source ~/.bashrc
export LUIGI_CONFIG_PATH=~/this-repo/luigi.cfg
cd this-repo/workflows
python3 -m luigi --module postgres_replication Run --workers 10
- Kicking off jobs can be scheduled / managed using
clock.py
,/workflows
, or via a shell script. We typically use/workflows
for our production task workflows
-
Does something to determine dependencies and then kicks off the necessary
pipeline.py SparkSubmitTask
-
Example:
workflows/postgres_replication.py
- Connects to Postgres db, determines based on the size which tables will be incremental or not, then launches the
postgres.py Table
spark job viapipeline.py SparkSubmitTask
for each table required on the import
- Connects to Postgres db, determines based on the size which tables will be incremental or not, then launches the
-
pure python alternative to cron
-
run
python clock.py
to start all jobs defined inclock.py
-
(screen)[https://www.gnu.org/software/screen/manual/screen.html] can be used to manage the
clock.py
application connection (Contrl C
will insert you into the running clock.py process where you can dynamically schedule or clear jobs)
# list running screen sessions
screen -r
# list all screen sessions
screen -ls
# kill screen session 10031.pts-50.host1
screen -S 10031.pts-50.host1 -p 0 -X quit
# connect to existing screen session 10031.pts-50.host1
screen -d -r 10031.pts-50.host1
cd ~/this-repo/workflows/
export LUIGI_CONFIG_PATH=../luigi.cfg
python spark_sql.py test_master
- Testing performance of a job can be done using sparklint and submitting the spark job with:
--conf spark.extraListeners=com.groupon.sparklint.SparklintListener
--packages com.groupon.sparklint:sparklint-spark201_2.11:1.0.4
- then open a browser and navigate to your driver node’s port 23763
- NOTE: Deploying any major change requires recreating the PythonENV so that spark-utils are available within any given spark-context
python pipeline.py PythonENV
- Tests go in
tests/
- To execute a test file, run
python module_test.py
- Handle your test paths/imports using
context.py
- Luigi Task Layer 1 -
pipeline.py
submits the spark job - Luigi Task Layer 2 - luigi runs the
postgres.py Table
task within Spark cluster and reports completion time, errors, status - Luigi Task Layer 1 - 'pipeline.py' checks the yarn logs to make sure the task succeeded
- setup.py
class Error
: handling general exceptions in this moduleclass Python
: by default, install Anaconda python 3.5.2- interface.py
read_*_config
: reads local or hdfs configport_is_open
: check if scheduler is runningget_task
: cmdline parser to luigi.Taskbuild
: build luigi workflow programmaticallyrun
: thread safe alternative tobuild
build_and_run
: wrapsget_task
andrun
- class decorators for custom logic to handle task statuses
- scheduler.py
Scheduler
: python implementation of cron alternativeSafeScheduler
: scheduler loop will not fail if a job failsJob
: handles the actual execution of a job- google_sheet.py
class Sheet
: Writes all contents of one tab from google sheet to a hive tableclass SheetToHDFS
: Writes all contents of one tab from google sheet to hdfs path folder- hdfs.py
class Directory
: hdfs directory utilityclass File
: hdfs file utility (i.e. create/delete/read/write/describe)- pipeline.py
class PythonENV
: creates a spark ready python env- inputs:
packages
: list of python packages to be installed by pippackages_conda
: list of pyhton packages to be installed by condapython_version
: i.e. 3.5env_name
: name to call this env that is created (default =py3spark_env
)python_install
: iftrue
, then the classPythonInstall
is ran before creating the env- output: resulting env.zip will be in
~/anaconda3/envs/
directory class SparkSubmitTask
: given inputs and spark parameters and job, attemps to submit it for you- an abstraction of
spark-submit
- postgres.py
class Table
: replicates a postgres table to a hive table (incrementally or from scratch)class Query
: creates a hive table based on results of postgres Queryclass QueryToHDFS
: replicates a postgres table to hdfs folder path- hive.py:
HiveQueryTask
: Uses beeline to run a query