Export the Results of a Looker Query to BigQuery

etl

(jesse.carah) #1

In a previous post, I wrote about how you can use Data Actions to send a row of data from Looker to BigQuery. @Dimitri_Masin pondered: Could we use this approach to stream the full results of a Looker query back to BigQuery? In this post I’ll outline how to do just that leveraging the Action Hub’s Action API.

Once again, we’ll be using Google Cloud Functions to build the action. As a refresher:

Like AWS’s Lambda, Cloud Functions let you deploy code that gets executed based off of some event. With an action, you can push JSON containing data from Looker as well as user-defined form parameters to a Cloud Function endpoint. The Cloud Function then parses the JSON, extracts the relevant values, and calls on the BigQuery SDK to stream the results to BigQuery.

In this example, we’ll be creating three Cloud Functions – one for each endpoint in the Action API. Two of these Cloud Functions will just be returning (relatively) static JSON. Let’s get started!

Configure a BigQuery Dataset and Initialize a Table

Head over to the BigQuery Console and create a new dataset for your Looker Exports.

Now, create an empty table with a schema definition. In the schema, you should include columns which you plan to push to BigQuery from Looker, as well as a timestamp column called record_created_at.

A quick note on naming conventions: This script assumes that the column names in your BigQuery table match the field names (not labels!) in the data that you’re exporting from Looker. Do not include the view name in your columns.

Here’s the table that we’ll be streaming data to in our example:

27%20PM

Configuring APIs

Follow the first three steps here to select your GCP project and enable API access for Cloud Functions and BigQuery.

Creating the Action List Cloud Function

We’ll start off with our first function. This will serve as the Action List Endpoint. This function will return static JSON that lists all the actions your hub exposes. Each listed action contains metadata instructing Looker how to execute your action.

  1. Navigate to https://console.cloud.google.com/functions and ensure that you’ve selected the same project in which BigQuery resides.
  2. Click Create Function, and give it a name (I’m calling mine bigquery_action_list). Select a memory allocation
  3. Select HTTP as your trigger
  4. Select your preferred runtime (for this example, I will use Python 3.7, but versions of Node.js are also supported).
  5. Click ‘Source’.
  6. Click ‘Edit’
  7. Paste the following code in the script editor
Action List Code (click to show code)

def action_list(request):
	r = request.get_json()
	print(r) # so it shows up in logs
    
	response = """
        {
       	"integrations": [{
       		"name": "bigquery",
       		"label": "Send results to a BigQuery Table.",
       		"supported_action_types": ["query"],
       		"url": "",
       		"form_url": "",
            "icon_data_uri": "",
        	"required_fields": [],
       		"params": [{
       			"name": "api_key",
       			"label": "API Key",
       			"required": true
       		}],
           "supported_formats": ["json"],
           "supported_formattings": ["unformatted"],
           "supported_visualization_formattings": ["noapply"]
       	}]
       }

	"""
	return response

  1. The function to execute is action_list
  2. Save

Creating the Action Form Cloud Function

We’ll now create a new Cloud Function that will serve as the Action Form Endpoint.

If the action’s definition specifies a form, Looker will ask this endpoint for a form template that should be displayed to the end user to configure the action. Since Looker will ask this endpoint for the form every time it’s displayed, it’s possible to dynamically change the form based on whatever information you like. For example, in a chat application the form might dynamically list all the available chat channels that data can be sent to.

  1. Create a new function as you did in the previous step (same memory, runtime, etc).
  2. Give the function a name. I’m calling mine bigquery_action_form
  3. Navigate to requirements.txt and include a line for google-cloud-bigquery==1.5.0. This will allow you to use the BigQuery SDK in your function.
  4. In main.py, paste the following:
Action Execute Code (click to show code)
import google.cloud.bigquery as bigquery
import json

def action_form(request):
    
	client = bigquery.Client()
    # collect list of available tables
	dataset_ref = client.dataset('streaming_inserts')
	tables_obj = list(client.list_tables(dataset_ref))
	table_options = [{'name': table.table_id, 'label': table.table_id } for table in tables_obj]
    
	form = """
       [{
			"name":"table",
            "label":"Table",
            "description":"Name of destination table.",
            "type":"select",
            "default":"no",
			"required":true,
			"options":""" + json.dumps(table_options) + """
       }]
	"""
	return form
  1. Be sure to substitute that name of your dataset in line 5.

  2. Set ‘Function to execute’ to action_form

Creating the Action Execute Cloud Function

We’ll now create our last and most important function. This function will serve as the Action Execute Endpoint.

This is the endpoint that Looker will send the payload, form information, and other metadata in order to execute a given action. This is the main implementation of your action.

  1. Create a new function as you did in the previous step (same memory, runtime, etc).
  2. Give the function a name. I’m calling mine bigquery_action_execute
  3. Navigate to requirements.txt and include a line for google-cloud-bigquery==1.5.0. This will allow you to use the BigQuery SDK in your function.
  4. In main.py, paste the following:
Action Execute Code (click to show code)
import google.cloud.bigquery as bigquery
import time
import datetime
import os
import json

def verify_key(api_key):
    return api_key == os.environ.get('api_key')

def parse_request(request):
    request_json = request.get_json()
    print(request_json) # so it shows up in the logs
    dataset_id = request_json['data']['dataset']
    api_key = request_json['data']['api_key']
    query_data = json.loads(request_json['attachment']['data'])
    table_id = request_json['form_params']['table']
    looker_scoped_fields = request_json['scheduled_plan']['query']['fields']
    looker_fields = [field.split('.')[1] for field in looker_scoped_fields]
    return dataset_id, api_key, query_data, table_id, looker_fields
    
def verify_fields(bq_fields, looker_fields):
    # Verify that columns from looker are subset of BigQuery columns)
    return set(looker_fields) < set(bq_fields)

def non_existant_bq_fields(bq_fields, looker_fields):
    return list(set(looker_fields) - set(bq_fields))

def remove_namespace(data):
    # Remove view namespace from Looker field names
	return [dict([(l[0].split('.')[1],l[1]) for l in x.items()]) for x in data]

def bigquery_metadata(dataset_id, table_id):
    client = bigquery.Client()
    table_ref = client.dataset(dataset_id).table(table_id)
    table = client.get_table(table_ref)
    bq_fields = [field.name for field in table.schema]
    return client, table, bq_fields

def stream_rows(client, table, rows):
    errors = client.insert_rows(table, rows)
    if errors:
        print(errors) # so they show up in the logs
    assert errors == []


def write_to_bigquery(request):
    dataset_id, api_key, query_data, table_id, looker_fields = parse_request(request)
    
    # Verify API key
    if not verify_key(api_key):
        return '{"looker": {"success": false, "validation_errors": {"Unauthorized"}}}'

    client, table, bq_fields = bigquery_metadata(dataset_id, table_id)

    # Verify that the fields from the Looker query exist in BigQuery
    if not verify_fields(bq_fields, looker_fields):
        fields_missing = non_existant_bq_fields(bq_fields, looker_fields)       
        return '{"looker": {"success": false, "validation_errors": {"field(s) ' + ', '.join(fields_missing) +' do not exist in table ' + table_id+'"}}}'

    # Prepare the data to stream to BigQuery
    data_to_stream = remove_namespace(query_data)
 
    for row in data_to_stream:
        row['record_created_at'] = datetime.datetime.fromtimestamp(int(time.time())).strftime('%Y-%m-%d %H:%M:%S')
    
    # Stream the data to BigQuery
    stream_rows(client, table, data_to_stream)
    return '{"looker": {"success": true}}'
  1. Set ‘Function to execute’ to write_to_bigquery.
  2. Click the ‘More’ button at the bottom of the page and click “Add variable” to add an environmental variable with the following values:
  • Name: api_key
  • Value: < SOME STRING >

We will be using this api_key to authenticate the request with Looker. Save the api key for later.

Reference the other two actions in your action_list function.

  1. So that Looker knows where to find the action_execute and action_form endpoints, navigate to the trigger tab of each of those cloud functions, and copy each URL.
  2. Navigate back to the action_list function and paste in the URLS:
response = """
			{
			"integrations": [{
				"name": "bigquery",
				"label": "Send results to a BigQuery Table.",
				"supported_action_types": ["query"],
				"url": "<action_execute function URL>",
				"form_url": "<action_form function URL>",
....

Add the Action to Your Looker Instance

Now that we’ve created our cloud functions, we are going to configure the Action in Looker.

  1. Navigate to Admin > Actions in your Looker instance.
  2. Scroll to the bottom and click ‘Add Action Hub’
  3. Paste your action_list function URL in the text box and click ‘Add Action Hub’
  4. Click enable to configure the action
  5. Paste your API key in and click the ‘Enable’ button.
  6. Looker should flash a success message if everything is setup correctly

Using the BigQuery Action

The BigQuery Action can be accessed via the native Schedules interface. Create a query in the Explore section of Looker, and when you’re ready to send the results to BigQuery, click the gear icon and hit Send or Schedule. You’ll now notice Google BigQuery as one of your destination options.

Select the table you wish to export the data to, and hit send! You can now monitor the progress in the Scheduler History section of the admin panel.

Known limitations:

  • Right now, you are unable to stream unlimited results to BigQuery. I’ll be looking to build that functionality into the action in the coming weeks.
  • Consider adding in additional authentication layers to better secure the function.

Update(write-back) data on BigQuery from Looks?