NAV
bash

AirSafe Tracking Stream Examples

Here you find examples for how to use the AirSafe Tracking Stream endpoint across cloud providers and infrastructure types.

First we start with Python on AWS Lambda.

Python on AWS Lambda

You can use AirSafe Tracking Stream to stream target updates using AWS Lambda.

In this example we use Python to listen to v2/targets/stream on AWS Lambda. We log the arriving target updates of a filtered query that only shows target updates over Atlanta airport. Before reaching a set time-out we gracefully disconnect right after receiving a position_token; this way we avoid duplicate target update delivery. We store the position_token to re-connect to the same point in the stream on the next scheduled Lambda call.

The components of the infrastructure are:

You can find and download the source code for this example at http://github.com/spireglobal/aviation.

First, an overview of the example.

Overview

After reading this section you will know the purpose of each file in this example, and which role it plays.

The next section lists all prerequisites, with links and notes on how to install them.

Prerequisites

To execute this tutorial you need the following tools and accounts on your system:

Both pyenv and Pipenv can be installed via pipx.

Following the installation of these prerequisites you can walk through the next section to set up the example on your account.

Setup

This section contains the necessary commands to build and deploy the example, and describes what they do.

  1. Clone the repository, and navigate to the aws-lambda folder.
  2. Execute pyenv init and follow the instructions, to get pyenv ready to load the correct python version
  3. Run pyenv install 3.6-dev to install the lastest version of Python 3.6.
  4. Set the local python version to 3.6-dev by running pyenv shell 3.6-dev; python --version should now return Python 3.6.11+ or similar.
  5. Create a virtual environment by executing pipenv shell. This creates and loads a virtual environment.
  6. First we test the client and handler to see if we have all dependencies installed.
    1. Run pipenv sync --dev to install all development dependencies as specified by Pipfile.lock.
    2. Test the client and handler by running pytest --cov=. -vv. The following steps assume that this test passed.
  7. Now we minimize the installed dependencies, to keep the lambda handler small. We remove the virtual environment, and recreate it, but install only the production dependencies. Do this by running pipenv --rm and then pipenv sync, this time without the --dev parameter.
  8. At this point we tested that the source code works, and we have production dependencies installed. Now we assemble all code and dependencies into a zip file for AWS Lambda. Do this by calling ./assemble-lambda.sh. This creates a file called lambda.zip in the current working directory.
  9. It is time to create the infrastructure on AWS. Run terraform apply . to create an S3 bucket, a Lambda function and a CloudWatch Event that will trigger the Lambda every 5 minutes. Terraform will ask for the value of three variables: airsafe2_token which is the AirSafe2 Stream Token that has been issued by Spire, profile which is the AWS credential set you want to use (defined by the heading in ~/.aws/credentials), and region, which will determine the AWS region the service will be deployed in. Follow the instructions given by terraform, and confirm if you agree to create the infrastructure and roles. Note that this will create infrastructure that will incur fees determined by AWS. Note also: If the S3 bucket name is in use, it might be necessary to choose a different value for bucket for resource "aws_s3_bucket" "last-position-token" in terraform.tf. Terraform resolves dependencies between components automatically, so there is no need to reconfigure environment variables etc. in e.g. the Lambda.
  10. Navigate to the AWS Console (e.g. for region us-west-1: https://us-west-1.console.aws.amazon.com/lambda/home?region=us-west-1#/functions/airsafe-2-streamer?tab=monitoring) to see Lambda invocations, and click on "View logs in CloudWatch" to see logs.

In this section you created a Lambda function, an S3 bucket, and a CloudWatch event to schedule the Lambda function. At this point these components should be active on AWS, and called every 5 minutes, logging target updates over Hartsfield–Jackson Atlanta International Airport.

The discussion will mention some aspects about this setup to consider.

Discussion of the Result

While the working stream should be visible now in CloudWatch Logs, here are some things to consider when consuming the AirSafe Tracking Stream product.

Architecture
CloudWatch Events triggers the Lambda periodically, and S3 is used to persist state across invocations. Having an event trigger outside the running system ensures that the stream has a chance to recover automatically, in case e.g. a write-operation to S3, or a Lambda execution failed.
A different approach that would not show this kind of resiliency would be to trigger the Lambda from SNS, and write the last position_token to SNS again to trigger the next Lambda invocation. This circular dependency means that one failure will halt the system.

Graceful Timeout and Keeping Up With the Stream
Handling timeouts gracefully means that for each time-interval that we trigger the lambda, we have less than that interval to retrieve all newly available data. This effect gets more pronounced the shorter the time-interval.
Graceful timeout means that the client will disconnect from the stream when it receives its last guaranteed position token (v2/targets/stream sends a position_token at least every 15 seconds + latency). So having the Lambda limited to e.g. 300 seconds as a hard limit means we will set a timeout of about 295 seconds for the stream client. Gracefully timing out on a 295-second timeout means the client will terminate the connection as soon as it receives a position_token 20 seconds before the timeout (i.e. sometime after 275 seconds). This means that the client has 275 seconds to download 300 seconds worth of data (~92%). Setting a hard Lambda timeout of 30 seconds would mean 5 seconds to fetch 30 seconds worth of time (~17%).
It is important to keep timeouts high, when gracefully timing out. An alternative would be to store each position token as it arrives, listen until receiving a hard timeout, and deal with some duplicate target updates that will arrive when reconnecting.

Data Processing and Keeping Up With the Stream
The volume of data in the stream can be high, depending on the used filter parameters. Spending a lot of time processing each target update can mean that the consumer falls back in the stream. At a certain point the consumer will be so far back that the server will disconnect. This will mean the consumer will lose out on available data.
It is useful to keep the amount of processing that is happening directly at data ingestion very low. Instead, any processing that takes more than a minimal amount of time can be executed in a scalable environment or asynchronously.

Cleanup

To destroy the infrastructure execute terraform destroy .

Python on GCP Cloud Functions

After following this tutorial you will have a working target update stream, consuming AirSafe /v2/targets/stream from Python, on Google Cloud Functions.

In this example we log the arriving target updates of a filtered query that only shows target updates over Atlanta airport. Before reaching a set time-out we gracefully disconnect right after receiving a position_token; this way we avoid duplicate target update delivery. We store the position_token to re-connect to the same point in the stream on the next scheduled Cloud Functions call.

The GCP services comprising the infrastructure are:

You can find and download the source code for this example at http://github.com/spireglobal/aviation.

First, an overview of the example.

Overview

This overview outlines the purpose of each file in this example, and which role it plays.

The next section lists all prerequisites, with links and notes on how to install them.

Prerequisites

To execute this tutorial you need the following accounts and tools on your system:

Having these prerequisites in place you can walk through the next section to set up the example on your account.

Setup

The first part of this section contains and describes the necessary commands to create the infrastructure and deploy the example. The second part shows how to set up the development environment, and adapt the code to your needs.

Creating a GCP project

Retrieve and set an environment variable with your organization ID.
We will use this variable to create an empty project.

gcloud organizations list
export TF_VAR_org_id="<YOUR-ORG-ID>"

Create an empty project.
Retrieve the project ID and store it in an environment variable. Terraform will use this variable to know which project to create the services in.

gcloud projects create airsafe-v2-stream --organization $TF_VAR_org_id --set-as-default
gcloud projects list
export TF_VAR_project_id="<THE-airsafe-v2-stream-PROJECT-ID>"

Retrieve the billing accounts, and store the relevant one in an environment variable

gcloud alpha billing accounts list
export TF_VAR_billing_account="<YOUR-BILLING-ACCOUNT_ID>"

Enable cloud billing for your project, to allow us to use services that depend on that.

gcloud services enable cloudbilling.googleapis.com
gcloud beta billing projects link $TF_VAR_project_id --billing-account $TF_VAR_billing_account

Enable the services we will use further on. Cloud Billing is a requirement for Cloud Scheduler.

gcloud services enable iam.googleapis.com
gcloud services enable cloudfunctions.googleapis.com
gcloud services enable storage.googleapis.com
gcloud services enable cloudscheduler.googleapis.com
gcloud services enable cloudbuild.googleapis.com
gcloud services enable cloudresourcemanager.googleapis.com

First we set up the project and environment such that Terraform can do the rest for us.

We create a new, empty project, set up billing, and set environment variables that Terraform will use for variables in terraform.tf.

To start, clone the git@github.com:spireglobal/aviation.git and navigate to the google-cloud-functions folder on the commandline.

Setting up the project for Terraform

Create a terraform IAM service account

gcloud iam service-accounts create terraform --display-name "Terraform admin account"

Allow terraform to create infrastructure using these services

gcloud projects add-iam-policy-binding $TF_VAR_project_id  --member "serviceAccount:terraform@$TF_VAR_project_id.iam.gserviceaccount.com" --role roles/editor
gcloud projects add-iam-policy-binding $TF_VAR_project_id  --member "serviceAccount:terraform@$TF_VAR_project_id.iam.gserviceaccount.com" --role roles/cloudfunctions.admin

Set credentials location for our terraform service account

export TF_CREDS="$HOME/.config/gcloud/$TF_VAR_project_id-terraform-admin.json"

Create keys for the terraform service account, and set environment variables for gcloud.

gcloud iam service-accounts keys create "$TF_CREDS" --iam-account "terraform@$TF_VAR_project_id.iam.gserviceaccount.com"
export GOOGLE_APPLICATION_CREDENTIALS=$TF_CREDS
export GOOGLE_PROJECT=$TF_VAR_project_id

Set the region to create the infrastructure in, and make your AirSafe2 Stream token available to Terraform

export TF_VAR_airsafe2_token="<YOUR-AIRSAFE-2-STREAM-TOKEN>"
export TF_VAR_region="us-west2"

To use Terraform, the project needs a service account that is allowed to create new service instances and set Cloud Function IAM roles.

Furthermore, terraform.tf makes use of variables, which are set in this section using environment variables with the prefix TF_VAR_.

Creating the infrastructure

Create a Google App Engine app. This is a requirement for Terraform to create a Cloud Scheduler Job.

gcloud app create --region=$TF_VAR_region

Initialize Terraform, and create and deploy the infrastructure.

terraform init
terraform apply .

Infrastructure creation might take a few minutes since Google Cloud Build will build and deploy the Cloud Function behind the scenes.

Discussion of the Result

In https://console.cloud.google.com/functions you should now see the streamer function. Click on it, and go to "View Logs" to see the function invoked regularly, logging incoming target updates, gracefully disconnecting, and reconnecting to the stream seamlessly, using position_tokens.

Note: Propagation of IAM roles and permissions might take a while, so the first one or two invocations of the Cloud Scheduler might fail; after a while things will consolidate.

Note: The first call of the streamer Cloud Function will report that it can't find a valid position_token and starts without one. Following invocations will re-start from their respective previous position_token.

Development

The core functionality for further processing, filtering or forwarding of target updates is located in main.py in the class TargetProcessor. This section shows how to modify and re-deploy that code.

For this example the target processor only logs target updates. For a real use-case the callback might add the target update to a list, to allow batch-processing after the time-out; it might forward the target update to a stream processor, or to a PubSub topic.

To modify the Python code of the GCP Cloud Function (main.py) or the API wrapper (client.py), first install pyenv, then cd into the function directory.

  1. Execute pyenv init and follow the instructions, to get pyenv ready to load the correct python version
  2. Run pyenv install $(cat .python-version) to install the required Python version on your system.
  3. Run pyenv shell $(cat .python-version) to load this python version into the active shell.
  4. Run pip install pipenv to install pipenv into the active pyenv virtual environment.
  5. Run pipenv --python $(cat .python-version) to create a virtual environment for this project.
  6. Run pipenv shell to load it into the active shell.
  7. Run pipenv sync --dev to install development and production requirements.
  8. pipenv --venv shows the virtual environment location, to correctly set the environment in your IDE.

Updating Dependencies for Deployment

Cloud Build for Cloud Functions installs dependencies that are specified in requirements.txt. To mirror the content of Pipfile.lock in requirements.txt, execute pipenv --rm && pipenv sync && pip list --format=freeze > requirements.txt in the pipenv shell.

Redeploying

terraform.tf is set up in a way that triggers a re-deployment on every execution of terraform apply .: The call to timestamp() in the streamer source name triggers a re-creation of that resource. So applying terraform is enough to re-deploy new code.

Cleanup

To remove all resources you can do one of two things:

Python in Docker

After following this tutorial you will have a working target update stream, consuming AirSafe /v2/targets/stream from Python, in a Docker container, without timeouts, and with the ability to reconnect to the stream, shortly before a disconnection event.

In this example we log the arriving target updates of a filtered query that only shows target updates over Atlanta airport. At every position_token we receive, we write the token to the local file-system. The last token is written and then renamed to make the update operation atomic. This avoids corrupt or half-written position_tokens. In case of a disconnect the stream reconnects at the last available position_token, to avoid missing target updates.

This example is best used as a base for a persistent connection that is not expected to interrupt regularly, e.g. a stream on EC2 or Google Compute Engine. Here we do not timeout the connection, but keep it alive indefinitely, until either the server disconnects, or the consumer fails. Then the consumer can reconnect to the last known position_token to avoid missing target updates, but it will likely receive some duplicate target updates.

You can find and download the source code for this example at http://github.com/spireglobal/aviation.

First, an overview of the example.

Overview

This overview outlines the purpose of each file in this example, and which role it plays.

The next section lists all prerequisites, with links and notes on how to install them.

Prerequisites

To execute this tutorial you need the following accounts and tools on your system:

Having these prerequisites in place you can walk through the next section to set up the example on your account.

Setup

The first part of this section contains and describes the necessary commands to build the Docker image and run the example on your system. The second part shows how to set up the development environment, and adapt the code to your needs.

Building the Docker Image

Build and tag the Docker image

docker build . -t airsafe-2-stream-example

To build the Docker image fetch the source code from http://github.com/spireglobal/aviation. In the terminal, navigate into the docker folder. In that folder, execute the docker build command.

In the next step we pass the AirSafe 2 stream token to the image, and mount a folder where the streaming process can persist position_tokens, to allow it to reconnect after potential failure.

Starting a Docker Container

Start the container locally, correctly setting the required environment variable and position_token mount point

docker run --rm -it -v $(pwd)/position_tokens:/root/position_tokens -e AIRSAFE2_TOKEN=$AIRSAFE2_TOKEN airsafe-2-stream-example

To start the container prepare as follows.

After these preparatory steps, execute the docker run command.

Discussion of the Result

After a few seconds you should start seeing a stream of logged target updates In the terminal where you started the Docker container. In a real system you would adapt the TargetProcessor class to do more than just log the incoming target updates.

In the folder position_tokens you should see a rolling list of timestamped position_tokens. The alphabetically last file contains the last position_token that has been sent by the stream. Looking into main.py you can see that the PositionTokenProcessor first writes incoming position_tokens timestamped with a 0 value and then renames them, which is an atomic operation on most systems. In case the write operation fails, the corrupt file will not interfere with the restart, since the restart reads the last available position_token, and ignores the previous ones.

The stream will not disconnect, other than if the server disconnects, or something fails on the client side.

This set-up can be called from a Linux service file to enable auto-reconnection after a disconnect or failure. The image can be built locally or in CI / CD, and Ansible can deploy the service file to load the image and start the container.

Note: The first start of the container will report that it can't find a valid position_token and starts without one at the latest position in the stream. Following invocations will re-start from their respective previous position_token.

Development

The core functionality for further processing, filtering or forwarding of target updates is located in main.py in the class TargetProcessor. This section shows how to modify and re-deploy that code.

For this example the target processor only logs target updates. For a real use-case the callback might add the target update to a list, to allow batch-processing after the time-out; it might forward the target update to a stream processor, or to a PubSub topic.

To modify the Python code of the GCP Cloud Function (main.py) or the API wrapper (client.py), first install pyenv, then cd into the docker directory.

  1. Execute pyenv init and follow the instructions, to get pyenv ready to load the correct python version.
  2. Run pyenv install $(cat .python-version) to install the required Python version on your system.
  3. Run pyenv shell $(cat .python-version) to load this python version into the active shell.
  4. Run pip install pipenv to install pipenv into the active pyenv virtual environment.
  5. Run pipenv --python $(cat .python-version) to create a virtual environment for this project.
  6. Run pipenv shell to load it into the active shell.
  7. Run pipenv sync --dev to install development and production requirements.
  8. pipenv --venv shows the virtual environment location, to correctly set the environment in your IDE.

Cleanup

To stop the active container, press Ctrl + C.

To remove the created Docker image, execute docker image rm airsafe-2-stream-example