- Install the GCP Service Broker, available on Pivotal Network (source is GitHub)
- Java 8 JDK installed
- Git client installed
- Using the Git client, clone this repo
- Change into this newly created directory:
cd ./pcf-gcp-retail-demo
SCDF is the foundation of the data flow through this system. The server orchestrates the data streams, which are composed of modular building blocks. These can be Source, Processor, or Sink. There is a large set of out of the box components available and, since they are Spring Boot apps, it is easy to build a customized module.
- Download the SCDF server and client JAR files, as documented here
- Configure the manifest
- Ensure RabbitMQ, MySQL, and Redis tiles are installed (Ops Manager)
- Create service instances of each of these, using
cf cs ...
- Push this app to PCF on GCP
- Ensure it is running
- Access the SCDF Dashboard (at
https://dataflow-server.YOUR_PCF_INSTALL.DOMAIN/dashboard/
) - Start the SCDF Client you downloaded in a previous step:
java -jar spring-cloud-dataflow-shell-1.1.2.RELEASE.jar
Initially, configure a simple stream to illustate the following data flow:
- Social media API client (see 3, 4, 5 in diagram) periodically POSTs data to ...
- a SCDF HTTP Source (item 6 in the diagram),
- which is routed through a custom SCDF Processor (item 8 in diagram), where data normalization and enrichment are done.
- Finally, the stream terminates with the SCDF Sink component (item 17 in diagram).
- Build the Processor project:
( cd ./transform-proc/ && ./mvnw clean package )
- Upload the resulting JAR,
./transform-proc/target/transform-proc-0.0.1-SNAPSHOT.jar
, into a Cloud Storage bucket, so SCDF is able to acces it. - Build the Sink project:
( cd ./log-sink/ && ./mvnw clean package )
- And upload its JAR,
./log-sink/target/log-sink-0.0.1-SNAPSHOT.jar
, to Cloud Storage.
- You can do this using the SCDF Dashboard (see above), by clicking the "APPS" tab,
- then, click "Bulk Import Applications",
- then, under "Out-of-the-box Stream app-starters", click the "Action" button associated with the "Maven based Stream Applications with RabbitMQ Binder" row.
- To import the two custom built apps which were uploaded to Cloud Storage, you need to go back to the "APPS" tab,
- then click the "+ Register Application(s)" button, then populate the various fields, clicking the "+" icon under "Actions" after you enter the first one, to provide a second set of inputs. Check the "Force" box, and note that this demo will name them xform-proc and log-ps.
- Still within the SCDF Dashboard, click the "STREAMS" tab.
- Click "Create Stream"
- From the available modules, click the "http" module and drag it to the canvas area.
- Repeat this for "xform-proc" and "log-ps" modules.
- With the three modules now on the canvas, using the mouse, create connections between them.
- Click "Create Stream", and fill in values for the stream name (we use "socialmedia" here).
- Click the "Deploy stream(s)" check box.
- Click "Create".
- Click the "Definitions" tab, which should show a view similar to this:
- Push the app:
( cd ./mock-source/ && cf push --no-start )
- Using
cf apps
, note the value in the "urls" column for the app whose name ends in "-http" - Now, create a service named "http-hub" using a URI based on that value:
cf cups http-hub -p '{"uri": "http://dataflow-server-hf30QYI-socialmedia-http.YOUR_PCF_INSTALL.DOMAIN"}'
- Bind this newly pushed app to this service instance:
cf bs mock-source http-hub
- Finally, start this app:
cf start mock-source
- Find the URL for this "mock-source" app, from the output of
cf apps
- Simulate the flow of events through this endpoint:
while
true; do curl http://mock-source.YOUR_PCF_INSTALL.DOMAIN/datetime ; sleep $( perl -e 'print int(1 + 15*rand()) . "\n"' ) ; done
- Tail the logs for the sink, the app whose name ends in "-log-ps":
cf logs dataflow-server-hf30QYI-socialmedia-log-ps
Every so often (at intevals ranging from 1 to 15 seconds), a log entry should appear:
2017-02-26T06:25:51.15-0500 [APP/PROC/WEB/0]OUT 2017-02-26 11:25:51.157 INFO 18 --- [c.socialmedia-1] log.sink : {"date_time": "02/26/17 11:25:51", "source": "mock", "days_until_message": "9 days 'til GCP NEXT"}
- "mock-source" represents the yet-to-be-implemented social media data source, and we can POST data to it. As envisioned, there would be several such components, all falling within this "DATA" area of the diagram, one per social media platform. The way this is decoupled, by having only a REST interface with the SCDF HTTP Hub, allows for developers to build each of these using whichever technology is most appropriate, the best match for the skills of the team and the social media API.
- SCDF HTTP Hub is just an out-of-the-box SCDF app. Its role here is simply to accept inputs from any of the disparate sources, and put these into the data stream.
- SCDF Processor, so far, is accepting input, which so far is just the simple JSON
string representing the current date and time, as defined in mock-source:
{"date-time": "02/24/17 02:04:23"}
. It parses the JSON message, computes the number of days between the given date and the date of the GCP NEXT, defined within./transform-proc/src/main/resources/application.properties
, enriches that data stream based on this computation, and emits the result into its outbound channel. - SCDF Sink simply takes this input and logs it.
- Stop that curl command loop
- Add the new app:
( cd ./ds_app_09/ && cf push )
- Use
cf apps
to find the URL for this new app - Update
transform-proc/src/main/resources/application.properties
with this URL value - Rebuild this app, then upload it as before
- Using the SCDF Dashboard, delete the existing stream ("Destroy")
- Re-create that stream, using "Create Stream", as before
- Resume that curl command loop
- Resume tailing the logs for the app whose name ends with "-log-ps"
Now the log entries appearing here should show new features in the JSON, similar to this:
2017-02-26T06:25:51.15-0500 [APP/PROC/WEB/0]OUT 2017-02-26 11:25:51.157 INFO 18 --- [c.socialmedia-1] log.sink : {"date_time": "02/26/17 11:25:51", "sentiment": {"magnitude": 0.9, "score": 0.4}, "source": "mock", "days_until_message": "9 days 'til GCP NEXT"}
- With this change, the SCDF Processor (8) is able to interact with the Data Science Interrogator (9).
- So far, pending a fleshed out version of (9), it just adds an example of what the Google Cloud
Language API would provide, for sentiment:
"sentiment": {"magnitude": 0.9, "score": 0.4}
- Once that REST interaction is complete, (8) simply emits the enriched message back onto the message queue for downstream processing.
- Document showing how to create Spring Cloud Stream components bound to Google PubSub
- Spring Initializr for Stream Apps
- Read this if SCDF Server runs out of disk space.
- We may want to look at validating the inputs coming from the various sources. Something like this, as is done with GitHub "webhooks", might work:
# Validate signature against GitHub Webhook secret, only if environment variable
# GIT_WEBHOOK_SECRET is set
global git_webhook_secret
if git_webhook_secret:
log("Comparing SHA1 digests for payload")
if type(git_webhook_secret) == unicode:
git_webhook_secret = git_webhook_secret.encode()
signature = request.headers.get("X-Hub-Signature").split('=')[1]
mac = hmac.new(git_webhook_secret, msg = request.data, digestmod = sha1)
if compare_digest(u'{0}'.format(mac.hexdigest()), u'{0}'.format(signature)):
log("Digests match -- proceeding")
else:
log("Digests don't match -- aborting")
abort(403)
- One possibly useful Instagram API client
- Google Language API, syntax example