Schedule a Dataflow job – full terraform example
First blog post in two and a half years, there’s been a whole pandemic since then!!
I’ve been messing around with GCP Dataflow recently and using GCP Cloud Scheduler to get my Dataflow batch jobs to run. I had a particularly tricky IAM problem I had to solve to make this work (read more on Stack Overflow if you’re interested) and in order to demonstrate the problem I built a repro of the problem using terraform and made it available at https://github.com/jamiet-msm/dataflow-scheduler-permission-problem.
Now that I’ve solved the problem (read my answer on the Stack Overflow post if interested) this repo now serves as a fully fledged example of how to create a working Dataflow job that is scheduled to run using Cloud Scheduler, all created using terraform. It takes care of all the pre-requisites such as:
- a service account
- IAM grants
- bigquery table that the dataflow job loads data into
- sample data
The README explains how to deploy and subsequently destroy it, you don’t even need terraform installed.
Head over to the repo if this sounds useful and check it out.
Use gcloud to list all project roles that a service account is a member of
In the category of “stuff I’ll need to know later” here is how to use the gcloud
command-line tool to get a list of all of the project roles that a service account is a member of.
SERVICE_ACCOUNT=my-service-account@my-gcp-project.iam.gserviceaccount.com
GCP_PROJECT=my-gcp-project
gcloud projects get-iam-policy $GCP_PROJECT \
--flatten="bindings[].members" \
--format='table(bindings.role)' \
--filter="bindings.members:$SERVICE_ACCOUNT"
Running that for one of my service accounts provided this output:
ROLE
roles/cloudsql.client
roles/compute.instanceAdmin
roles/compute.storageAdmin
roles/dataproc.editor
roles/dataproc.worker
roles/logging.logWriter
roles/logging.viewer
roles/monitoring.metricWriter
roles/storage.objectAdmin
roles/storage.objectViewer
Use awk to split a text file into multiple files based on some criteria
In the spirit of “when you learn something stick it in a blog post so you can find it later” here’s a solution to splitting a file into multiple files using the Unix/Linux command awk.
Here’s a demo script that dumps some sample data into a file and then uses awk to split it into multiple output streams:
echo "John,Bananas,London,10" > /tmp/input echo "John,Apples,London,5" >> /tmp/input echo "Jane,Bananas,London,2" >> /tmp/input echo "John,Apples,Leeds,3" >> /tmp/input echo "Jane,Bananas,Leeds,8" >> /tmp/input echo echo Jane buying Bananas echo =================== cat /tmp/input | awk -F, '$1=="Jane" && $2=="Bananas"' echo echo echo John buying in London echo ===================== cat /tmp/input | awk -F, '$1=="John" && $3=="London"' echo echo echo Buying in Leeds echo =============== cat /tmp/input | awk -F, '$3=="Leeds"' echo echo
which when run returns this output:
$ . /tmp/demoawk.sh Jane buying Bananas =================== Jane,Bananas,London,2 Jane,Bananas,Leeds,8 John buying in London ===================== John,Bananas,London,10 John,Apples,London,5 Buying in Leeds =============== John,Apples,Leeds,3 Jane,Bananas,Leeds,8
Like I said, I wrote this for my own reference but if someone else finds it useful, so much the better! Thank you to my colleague Marcin Kulisz who helped me out with this.
JT
Running Spark on Ubuntu on Windows subsystem for Linux
In my day job at dunnhumby I’m using Apache Spark a lot and so when Windows 10 gained the ability to run Ubuntu, a Linux distro, I thought it would be fun to see if I could run Spark on it. My earlier efforts in November 2016 were thwarted (something to do with enumerating network connections) so when Microsoft released the Windows 10 Creators Update I thought I’d give it another bash (pun intended). Happily this time it worked and in this blog post I’ll explain how I got it running in case anyone wants to do the same. If you don’t read all the blurb herein and instead just go run all the steps it should take you, if you’re quick, about 10 minutes.
Enable Windows subsystem for Linux
There are countless sites that will tell you how to do this (here’s one) but basically you need to turn on Windows Subsystem for Linux (Beta) in Control Panel->Windows Features:
then open up a PowerShell window and run
lxrun /install
which will install all the bits:
Once its installed run
bash
to launch a linux bash prompt.
Download and install Spark
Once at the prompt run the following commands:
#install Java runtime environment (JRE) sudo apt-get install openjdk-8-jre-headless export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre #download spark, visit https://spark.apache.org/downloads.html if you want a different version wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz #untar and set a symlink sudo tar -xvzf spark-2.1.0-bin-hadoop2.7.tgz -C /opt sudo ln -s spark-2.1.0-bin-hadoop2.7 /opt/spark
That’s it. Now to actually use it.
Run some Spark code
Run:
/opt/spark/bin/spark-shell
and you’ll be launched into the Spark REPL which is actually the Scala REPL preloaded with some Spark stuff (Spark is written in Scala hence its considered the canonical language for doing Spark development):
Let’s try the “Hello World” for Spark:
sc.parallelize(Array(1,2,3,4,5)).collect()
If it works you should see this:
scala> sc.parallelize(Array(1,2,3,4,5)).collect()
res5: Array[Int] = Array(1, 2, 3, 4, 5)
Not in itself particularly exciting, but given I’m running this on Windows and it actually works I think that’s pretty cool.
OK let’s do something more interesting.
val df = spark.read.json("/opt/spark/examples/src/main/resources/people.json") df.show()
Cool! If you’re already familiar with Spark then you can stop here but if you want to know a bit more about its APIs, keep reading.
Now let’s synthesize a bigger dataset and get its cardinality:
for (a <- 1 until 10){ df = df.union(df) } df.count()
A dataset of 1536 rows is a bit more interesting that we can do a few things with:
df.filter(df("age") > 21).count() df.groupBy("name").count().show() //combine a few functions together //this fluent API approach is what I've come to love about Spark df.withColumn("is over 21", df("age") > 21).limit(5).show()
Notice that when Spark shell started up it provided a link to the Spark UI:
Browse to that URL to view all the stats that Spark keeps about everything that it executes:
If you want to persist the data as a database table there is a simple saveAsTable() method that allows you to do that, which is then easy to consume using table():
val df = spark.read.json("/opt/spark/examples/src/main/resources/people.json") df.write.saveAsTable("savetabledemo") spark.table("savetabledemo").show()
And if fluent APIs aren’t your thing there’s still good old SQL:
spark.sql("select count(*) from savetabledemo").show()
Prefer Python to Scala?
If you prefer Python to Scala then you’ll be glad to know you can use that too. One thing you’ll need to do first is, back at the bash prompt, create a symlink to the Python interpreter then launch the Python REPL for Spark, pyspark:
ln -s /usr/bin/python3.5 python /opt/spark/bin/pyspark
From there you can run the same operations as before, just using python syntax instead.
sc.parallelize([1,2,3,4,5]).collect() df = sqlContext.read.json("/opt/spark/examples/src/main/resources/people.json") df.show() df.filter(df.age > 21).count() df.groupBy("name").count().show() df.withColumn("is over 21", df.age > 21).limit(5).show() df.write.saveAsTable('savetabledemo_python') sqlContext.table('savetabledemo_python').show() sqlContext.sql('select count(*) from savetabledemo_python').show()
That’s probably enough for now. Hope this helps.
Post messages to a Microsoft Teams incoming webhook from behind a proxy using Python
The subject of this post is a bit of a mouthful but its going to do exactly what it says on the tin.
I’ve been playing with Microsoft Teams a lot over the past few days and I wanted to programatically post messages to a channel on Microsoft Teams using the language I’m using most often these days, Python. In my case it was made complicated by the presence of my company’s proxy server, I figured out a way around it and figured I’d post the code here for anyone that might be interested.
First you’ll need to create an incoming webhook. I’ll assume that you know what that means otherwise why would you be reading this? I’ll also assume you’ve already gotten your webhook URL as per the instructions at https://msdn.microsoft.com/en-us/microsoft-teams/connectors#create-the-webhook.
After that you just need to edit the code below with the address of your proxy server (I can’t help you with that), a username to authenticate to your proxy server, and your webhook URL. You will also need to have installed the requests library.
Here’s the code:
import requests #learn about requests: http://docs.python-requests.org/en/master/ from requests.auth import HTTPProxyAuth from getpass import getpass proxyDict = { 'http' : 'your.proxy.server:port', 'https' : 'your.proxy.server:port' } webhook_url = "https://outlook.office.com/webhook/blahblahblah" user = 'username' password = getpass() auth = HTTPProxyAuth(user, password) json_payload = {'text' : 'hello world'} requests.post(url=webhook_url, proxies=proxyDict, auth=auth, json=json_payload)
If it works you should see something like this posted to the channel for which you set up the incoming webhook:
Good luck!
PySpark starter for ten
I just threw this together and I’m putting it here mainly in case I need it later. It might come in handy for others too…
So you have a new Spark installation against a yarn cluster, you want to run something simple on it (akin to hello World) to see if it does anything. Try copying and pasting this into your bash shell:
echo "from pyspark import SparkContext, HiveContext, SparkConf" > sparking.py echo "conf = SparkConf().setAppName('sparking')" >> sparking.py echo 'conf.set("spark.sql.parquet.binaryAsString", "true")' >> sparking.py echo "sc = SparkContext(conf=conf)" >> sparking.py echo "sqlContext = HiveContext(sc)" >> sparking.py echo "l = [('Alice', 1)]" >> sparking.py echo "rdd = sc.parallelize(l)" >> sparking.py echo "for x in rdd.take(10):" >> sparking.py echo " print x" >> sparking.py spark-submit --master yarn --deploy-mode cluster --supervise --name "sparking" sparking.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
echo "from pyspark import SparkContext, HiveContext, SparkConf" > sparking.py | |
echo "conf = SparkConf().setAppName('sparking')" >> sparking.py | |
echo 'conf.set("spark.sql.parquet.binaryAsString", "true")' >> sparking.py | |
echo "sc = SparkContext(conf=conf)" >> sparking.py | |
echo "sqlContext = HiveContext(sc)" >> sparking.py | |
echo "l = [('Alice', 1)]" >> sparking.py | |
echo "rdd = sc.parallelize(l)" >> sparking.py | |
echo "for x in rdd.take(10):" >> sparking.py | |
echo " print x" >> sparking.py | |
spark-submit –master yarn –deploy-mode cluster –supervise –name "sparking" sparking.py | |
If it runs you should see something like this at the end of the yarn log:
Log Type: stdout Log Upload Time: Thu Jan 05 14:56:09 +0000 2017 Log Length: 13 ('Alice', 1)
Creating a Spark dataframe containing only one column
I’ve been doing lots of Apache Spark development using Python (aka PySpark) recently, specifically Spark SQL (aka the dataframes API), and one thing I’ve found very useful to be able to do for testing purposes is create a dataframe from literal values. The documentation at pyspark.sql.SQLContext.createDataFrame() covers this pretty well however the code there describes how to create a dataframe containing more than one column like so:
l = [('Alice', 1)] sqlContext.createDataFrame(l).collect() # returns [Row(_1=u'Alice', _2=1)] sqlContext.createDataFrame(l, ['name', 'age']).collect() # returns [Row(name=u'Alice', age=1)]
For simple testing purposes I wanted to create a dataframe that has only one column so you might think that the above code could be amended simply like so:
l = [('Alice')] sqlContext.createDataFrame(l).collect() sqlContext.createDataFrame(l, ['name']).collect()
but unfortunately that throws an error:
TypeError: Can not infer schema for type: <type 'str'>
The reason is simple,
('Alice', 1)
returns a tuple whereas
('Alice')
returns a string.
type(('Alice',1)) # returns tuple type(('Alice')) #returns str
The latter causes an error because createDataFrame() only creates a dataframe from a RDD of tuples, not a RDD of strings.
There is a very easy fix which will be obvious to any half-decent Python developer, unfortunately that’s not me so I didn’t stumble on the answer immediately. Its possible to create a one-element tuple by including an extra comma like so:
type(('Alice',)) # returns tuple
hence the earlier failing code can be adapted to this:
l = [('Alice',)] sqlContext.createDataFrame(l).collect() # returns [Row(_1=u'Alice')] sqlContext.createDataFrame(l, ['name']).collect() # returns [Row(name=u'Alice')]
It took me far longer than it should have done to figure that out!
Here is another snippet that creates a dataframe from literal values without letting Spark infer the schema (behaviour which, I believe, is deprecated anyway):
from pyspark.sql.types import * schema = StructType([StructField("foo", StringType(), True)]) l = [('bar1',),('bar2',),('bar3',)] sqlContext.createDataFrame(l, schema).collect() # returns: [Row(foo=u'bar1'), Row(foo=u'bar2'), Row(foo=u'bar3')]
or, if you don’t want to use the one-element tuple workaround that I outlined above and would rather just pass a list of strings:
from pyspark.sql.types import * from pyspark.sql import Row schema = StructType([StructField("foo", StringType(), True)]) l = ['bar1','bar2','bar3'] rdd = sc.parallelize(l).map (lambda x: Row(x)) sqlContext.createDataFrame(rdd, schema).collect() # returns [Row(foo=u'bar1'), Row(foo=u'bar2'), Row(foo=u'bar3')]
Happy sparking!
Tax analysis using your SA302 statement
HMRC will, if you ask them, send you a form called SA302 which shows your tax calculation for a given year. As I’m self-employed and thus have to submit a Self Assessment tax return every year I find this to be very useful.
To order the SA302 calculation telephone 01619319070 or 08453000627, it should take a couple of minutes at most. You will need your National Insurance Number.
My latest SA302 arrived today and I’ve spent this evening compiling an Excel workbook containing all my SA302 data for the past five years (fun fun fun). I’ve found this to be very very useful so figured I should make the workbook available to anyone else in a similar position. The workbook can be accessed via your web browser (no Excel installation required) here: TaxAnalysis.xlsx. It already has some (fake) numbers filled in for years 2009-10 through to 2013-14:
all you need to do is replace the fake numbers with your own and voila, you’ll get some nice charts like these showing you useful information about how much you earned and how much tax you paid over those years:
Hope this is useful! If so, please do let me have some feedback. Thanks!
Charting London Borough of Hounslow expenditure
In November 2012 my family and I moved into the London Borough of Hounslow and as I am expecting to be here for a very long time I decided to avail myself of some information pertaining to how the council spends its money. All expenditure is published on the council website at Council spending over £500. Its great that that information exists and is published however the format in which it is published isn’t particularly useful to folks like me that want to analyse and drill into the data in greater detail, what we get is a PDF file (rubbish) and a CSV file (better) per month:
Why is PDF rubbish? Because the data is static, we can’t explore it, reshape it, drill into it. The data is presented in whatever format the person who produced the PDF decides. This is all bad bad bad. CSV on the other hand (which stands for comma-separated-values) is better because it contains only raw data and there’s no pre-determined presentation of the data. One can take the monthly CSV files and collate them into a single view that allows exploration and comparison of the data and that is exactly what I have done; I have taken all available data (from April 2012 onwards) and published it online at All London Borough of Hounslow Supplier expenditure over 500GBP since April 2012.
The publishing format is a Microsoft Excel workbook however you do not need Excel installed in order to view it, you only need a web browser. You do have the option to download the workbook to take advantage of the greater power of Excel and do your own analysis.
Putting the data into Excel enables us to provide summaries and visualisations over the data such as expenditure per month:
Top ten expenditures per external Supplier, Expense Type & Service Area:
All the charts are attached to objects called slicers that makes them interactive. Here’s an example of a slicer:
Clicking on a Supplier will cause the charts to display data for only that Supplier (you can select multiple Suppliers by holding down the CTRL button).
Similar to Slicers are Timelines which enable us to show data for a particular month or groups of months:
Importantly, i shall be adding new data to this Excel workbook as and when it becomes available so check back every month to see the data changing.
The first month for which data was available was April 2012 hence when April 2013 rolls around we can start to do year-on-year comparisons and that is when the information might start to reveal some interesting spending trends of the council.
If you’re interested in the council’s absolute total expenditure since April 2012 I show that on the first sheet:
Finally, having access to all this data enables to discover interesting facts such as how much the council has spent with a particular chauffeur supplier:
If you find any other interesting titbits hidden in this corpus do let me know!
I encourage you to take a look and if you have any feedback please leave a comment below.
Becoming gardener
Helen, Bonnie and I recently moved into our new House in Hanworth Park and with the new house I inherited a substantial vegetable plot (the estate agent called it an orchard but given there’s only one tree in it that’s rather grandiose) that the previous owner has clearly put lots and lots of work into as you can see:
Check out my PhotoSynth of it here: http://bit.ly/jtveggiepatch.
When we bought the house I resolved that I would try and maintain this in the same way and hopefully we could become slightly more self-sufficient in the process; anyone who knows me knows that I am not in the slightest bit green-fingered so this is actually a rather daunting task. Undeterred I ventured down to the veggie patch this morning to make my first harvest of some beetroot that the previous owner had kindly left for us. Here is my first crop:
Not exactly a bumper crop but I am hoping I can get a good few jars of pickled beetroot outta this little lot and perhaps keep some aside to be roasted with our christmas lunch. I have a recipe for pickling beetroot from Miles Collins (see: How to Pickle Beetroot) and I ventured out today to get all the ingredients. Tomorrow is pickling day – check back later to see how I get on!