AirSafe Tracking Stream Examples
Here you find examples for how to use the AirSafe Tracking Stream endpoint across cloud providers and infrastructure types.
First we start with Python on AWS Lambda.
Python on AWS Lambda
You can use AirSafe Tracking Stream to stream target updates using AWS Lambda.
In this example we use Python to listen to v2/targets/stream on AWS Lambda. We log the arriving target updates of a filtered query that only shows target updates over Atlanta airport. Before reaching a set time-out we gracefully disconnect right after receiving a position_token; this way we avoid duplicate target update delivery. We store the position_token to re-connect to the same point in the stream on the next scheduled Lambda call.
The components of the infrastructure are:
- AWS Lambda to configure the connection and to receive target updates;
- AWS Cloudwatch Events to trigger Lambda regularly; and
- AWS S3 to store the location of the stream at time-out.
You can find and download the source code for this example at http://github.com/spireglobal/aviation.
First, an overview of the example.
Overview
After reading this section you will know the purpose of each file in this example, and which role it plays.
client.py
andclient_test.py
contains production ready sample client code that wraps the v2/targets/stream API;
It exposes target updates via callbacks, and handles graceful disconnection to avoid duplicate target update delivery.handler.py
usesclient.py
and manages loading and storing position_tokens, that encode the position in the stream that the client has progressed to. This is also where the TargetProcessor class is located: This class processes target updates as they come in, and exposes a callback function to do so.terraform.tf
contains the infrastructure definition for AWS. Using Terraform 13 you can create the Lambda function, S3 bucket and CloudWatch Event.Pipfile
andPipfile.lock
define all Python package dependencies for the client and the handler, like boto3 to write to S3, the requests package to easily call network resources, and pytest for testing and development.assemble-lambda.sh
contains a script that assembles all installed python packages and local python files into one zip file that can be uploaded to AWS Lambda either via the AWS CLI, via Terraform, or via the AWS console.
The next section lists all prerequisites, with links and notes on how to install them.
Prerequisites
To execute this tutorial you need the following tools and accounts on your system:
- An AWS account, with configured profiles e.g. in
~/.aws/credentials
; - Terraform 0.13 to create the AWS infrastructure;
- Access to a bash shell with
zip
on the PATH to execute the./assemble-lambda.sh
script; - pyenv to download and load the correct Python version;
- Pipenv to create the virtual environment and install dependencies; and
- Git to download the source code.
Both pyenv and Pipenv can be installed via pipx.
Following the installation of these prerequisites you can walk through the next section to set up the example on your account.
Setup
This section contains the necessary commands to build and deploy the example, and describes what they do.
- Clone the repository, and navigate to the
aws-lambda
folder. - Execute
pyenv init
and follow the instructions, to get pyenv ready to load the correct python version - Run
pyenv install 3.6-dev
to install the lastest version of Python 3.6. - Set the local python version to 3.6-dev by running
pyenv shell 3.6-dev
;python --version
should now returnPython 3.6.11+
or similar. - Create a virtual environment by executing
pipenv shell
. This creates and loads a virtual environment. - First we test the client and handler to see if we have all dependencies installed.
- Run
pipenv sync --dev
to install all development dependencies as specified byPipfile.lock
. - Test the client and handler by running
pytest --cov=. -vv
. The following steps assume that this test passed.
- Run
- Now we minimize the installed dependencies, to keep the lambda handler small. We remove the virtual environment, and
recreate it, but install only the production dependencies. Do this by running
pipenv --rm
and thenpipenv sync
, this time without the--dev
parameter. - At this point we tested that the source code works, and we have production dependencies installed. Now we assemble
all code and dependencies into a zip file for AWS Lambda. Do this by calling
./assemble-lambda.sh
. This creates a file calledlambda.zip
in the current working directory. - It is time to create the infrastructure on AWS. Run
terraform apply .
to create an S3 bucket, a Lambda function and a CloudWatch Event that will trigger the Lambda every 5 minutes. Terraform will ask for the value of three variables:airsafe2_token
which is the AirSafe2 Stream Token that has been issued by Spire,profile
which is the AWS credential set you want to use (defined by the heading in~/.aws/credentials
), andregion
, which will determine the AWS region the service will be deployed in. Follow the instructions given by terraform, and confirm if you agree to create the infrastructure and roles. Note that this will create infrastructure that will incur fees determined by AWS. Note also: If the S3 bucket name is in use, it might be necessary to choose a different value forbucket
forresource "aws_s3_bucket" "last-position-token"
interraform.tf
. Terraform resolves dependencies between components automatically, so there is no need to reconfigure environment variables etc. in e.g. the Lambda. - Navigate to the AWS Console (e.g. for region us-west-1: https://us-west-1.console.aws.amazon.com/lambda/home?region=us-west-1#/functions/airsafe-2-streamer?tab=monitoring) to see Lambda invocations, and click on "View logs in CloudWatch" to see logs.
In this section you created a Lambda function, an S3 bucket, and a CloudWatch event to schedule the Lambda function. At this point these components should be active on AWS, and called every 5 minutes, logging target updates over Hartsfield–Jackson Atlanta International Airport.
The discussion will mention some aspects about this setup to consider.
Discussion of the Result
While the working stream should be visible now in CloudWatch Logs, here are some things to consider when consuming the AirSafe Tracking Stream product.
Architecture
CloudWatch Events triggers the Lambda periodically, and S3 is used to persist state across invocations. Having an
event trigger outside the running system ensures that the stream has a chance to recover automatically, in case e.g.
a write-operation to S3, or a Lambda execution failed.
A different approach that would not show this kind of resiliency would be to trigger the Lambda from SNS, and write the
last position_token to SNS again to trigger the next Lambda invocation. This circular dependency means that one failure
will halt the system.
Graceful Timeout and Keeping Up With the Stream
Handling timeouts gracefully means that for each time-interval that we trigger the lambda, we have less than that
interval to retrieve all newly available data. This effect gets more pronounced the shorter the time-interval.
Graceful timeout means that the client will disconnect from the stream when it receives its last guaranteed position
token (v2/targets/stream sends a position_token at least every 15 seconds + latency). So having the Lambda limited
to e.g. 300 seconds as a hard limit means we will set a timeout of about 295 seconds for the stream client. Gracefully
timing out on a 295-second timeout means the client will terminate the connection as soon as it receives a
position_token 20 seconds before the timeout (i.e. sometime after 275 seconds). This means that the client has 275
seconds to download 300 seconds worth of data (~92%). Setting a hard Lambda timeout of 30 seconds would mean 5 seconds to
fetch 30 seconds worth of time (~17%).
It is important to keep timeouts high, when gracefully timing out. An alternative would be to store each position token
as it arrives, listen until receiving a hard timeout, and deal with some duplicate target updates that will
arrive when reconnecting.
Data Processing and Keeping Up With the Stream
The volume of data in the stream can be high, depending on the used filter parameters. Spending a lot of time
processing each target update can mean that the consumer falls back in the stream. At a certain point the consumer
will be so far back that the server will disconnect. This will mean the consumer will lose out on available data.
It is useful to keep the amount of processing that is happening directly at data ingestion very low. Instead, any
processing that takes more than a minimal amount of time can be executed in a scalable environment or asynchronously.
Cleanup
To destroy the infrastructure execute terraform destroy .
Python on GCP Cloud Functions
After following this tutorial you will have a working target update stream, consuming AirSafe /v2/targets/stream from Python, on Google Cloud Functions.
In this example we log the arriving target updates of a filtered query that only shows target updates over Atlanta airport. Before reaching a set time-out we gracefully disconnect right after receiving a position_token; this way we avoid duplicate target update delivery. We store the position_token to re-connect to the same point in the stream on the next scheduled Cloud Functions call.
The GCP services comprising the infrastructure are:
- Google Cloud Function to configure the connection and to receive target updates;
- Google Cloud Scheduler Job to trigger the Cloud Function; and
- Google Cloud Storage to store the location of the stream at time-out.
You can find and download the source code for this example at http://github.com/spireglobal/aviation.
First, an overview of the example.
Overview
This overview outlines the purpose of each file in this example, and which role it plays.
terraform.tf
contains the infrastructure definition for GCP. Using Terraform 13 you can create the Cloud Function, Storage bucket and Scheduler Job.function/client.py
andfunction/client_test.py
contains production ready sample client code that wraps the v2/targets/stream API;
It exposes target updates via callbacks, and handles graceful disconnection to avoid duplicate target update delivery.function/main.py
usesfunction/client.py
and manages loading and storing position_tokens, that encode the position in the stream that the client has progressed to. This is also where the TargetProcessor class is located: This class processes target updates as they come in, and exposes a callback function to do so.function/Pipfile
andfunction/Pipfile.lock
define all Python package dependencies for the client and the handler, like the google package to write to Storage, the requests package to easily call network resources, and pytest for testing and development. Pip can be used inside the Pipenv virtual environment to export a list of installed requirements tofunction/requirements.txt
, which Google Cloud Build uses to build the Cloud Function.
The next section lists all prerequisites, with links and notes on how to install them.
Prerequisites
To execute this tutorial you need the following accounts and tools on your system:
- A Google Cloud account with an "Organization", and "Billing" enabled;
- The Google Cloud SDK (
gcloud
) command installed and authenticated; - Terraform 0.13 to create the GCP infrastructure;
- pyenv to install and load the correct Python version;
- Git to download the source code.
Having these prerequisites in place you can walk through the next section to set up the example on your account.
Setup
The first part of this section contains and describes the necessary commands to create the infrastructure and deploy the example. The second part shows how to set up the development environment, and adapt the code to your needs.
Creating a GCP project
Retrieve and set an environment variable with your organization ID.
We will use this variable to create an empty project.
gcloud organizations list
export TF_VAR_org_id="<YOUR-ORG-ID>"
Create an empty project.
Retrieve the project ID and store it in an environment variable. Terraform will use this variable to know which project to create the services in.
gcloud projects create airsafe-v2-stream --organization $TF_VAR_org_id --set-as-default
gcloud projects list
export TF_VAR_project_id="<THE-airsafe-v2-stream-PROJECT-ID>"
Retrieve the billing accounts, and store the relevant one in an environment variable
gcloud alpha billing accounts list
export TF_VAR_billing_account="<YOUR-BILLING-ACCOUNT_ID>"
Enable cloud billing for your project, to allow us to use services that depend on that.
gcloud services enable cloudbilling.googleapis.com
gcloud beta billing projects link $TF_VAR_project_id --billing-account $TF_VAR_billing_account
Enable the services we will use further on. Cloud Billing is a requirement for Cloud Scheduler.
gcloud services enable iam.googleapis.com
gcloud services enable cloudfunctions.googleapis.com
gcloud services enable storage.googleapis.com
gcloud services enable cloudscheduler.googleapis.com
gcloud services enable cloudbuild.googleapis.com
gcloud services enable cloudresourcemanager.googleapis.com
First we set up the project and environment such that Terraform can do the rest for us.
We create a new, empty project, set up billing, and set environment variables that Terraform will use
for variables in terraform.tf
.
To start, clone the git@github.com:spireglobal/aviation.git
and
navigate to the google-cloud-functions
folder on the commandline.
Setting up the project for Terraform
Create a terraform IAM service account
gcloud iam service-accounts create terraform --display-name "Terraform admin account"
Allow terraform to create infrastructure using these services
gcloud projects add-iam-policy-binding $TF_VAR_project_id --member "serviceAccount:terraform@$TF_VAR_project_id.iam.gserviceaccount.com" --role roles/editor
gcloud projects add-iam-policy-binding $TF_VAR_project_id --member "serviceAccount:terraform@$TF_VAR_project_id.iam.gserviceaccount.com" --role roles/cloudfunctions.admin
Set credentials location for our terraform service account
export TF_CREDS="$HOME/.config/gcloud/$TF_VAR_project_id-terraform-admin.json"
Create keys for the terraform service account, and set environment variables for
gcloud
.
gcloud iam service-accounts keys create "$TF_CREDS" --iam-account "terraform@$TF_VAR_project_id.iam.gserviceaccount.com"
export GOOGLE_APPLICATION_CREDENTIALS=$TF_CREDS
export GOOGLE_PROJECT=$TF_VAR_project_id
Set the region to create the infrastructure in, and make your AirSafe2 Stream token available to Terraform
export TF_VAR_airsafe2_token="<YOUR-AIRSAFE-2-STREAM-TOKEN>"
export TF_VAR_region="us-west2"
To use Terraform, the project needs a service account that is allowed to create new service instances and set Cloud Function IAM roles.
Furthermore, terraform.tf
makes use of variables, which are set in this section
using environment variables with the prefix TF_VAR_
.
Creating the infrastructure
Create a Google App Engine app. This is a requirement for Terraform to create a Cloud Scheduler Job.
gcloud app create --region=$TF_VAR_region
Initialize Terraform, and create and deploy the infrastructure.
terraform init
terraform apply .
Infrastructure creation might take a few minutes since Google Cloud Build will build and deploy the Cloud Function behind the scenes.
Discussion of the Result
In https://console.cloud.google.com/functions you should now see the streamer function. Click on it, and go to "View Logs" to see the function invoked regularly, logging incoming target updates, gracefully disconnecting, and reconnecting to the stream seamlessly, using position_tokens.
Note: Propagation of IAM roles and permissions might take a while, so the first one or two invocations of the Cloud Scheduler might fail; after a while things will consolidate.
Note: The first call of the streamer Cloud Function will report that it can't find a valid
position_token
and starts without one. Following invocations will re-start from their
respective previous position_token
.
Development
The core functionality for further processing, filtering or forwarding of target updates is located
in main.py
in the class TargetProcessor
. This section shows how to modify and re-deploy that code.
For this example the target processor only logs target updates. For a real use-case the callback might add the target update to a list, to allow batch-processing after the time-out; it might forward the target update to a stream processor, or to a PubSub topic.
To modify the Python code of the GCP Cloud Function (main.py
) or the API wrapper (client.py
),
first install pyenv, then cd
into the function
directory.
- Execute
pyenv init
and follow the instructions, to get pyenv ready to load the correct python version - Run
pyenv install $(cat .python-version)
to install the required Python version on your system. - Run
pyenv shell $(cat .python-version)
to load this python version into the active shell. - Run
pip install pipenv
to install pipenv into the active pyenv virtual environment. - Run
pipenv --python $(cat .python-version)
to create a virtual environment for this project. - Run
pipenv shell
to load it into the active shell. - Run
pipenv sync --dev
to install development and production requirements. pipenv --venv
shows the virtual environment location, to correctly set the environment in your IDE.
Updating Dependencies for Deployment
Cloud Build for Cloud Functions installs dependencies that are specified
in requirements.txt
. To mirror the content of Pipfile.lock
in requirements.txt
, execute
pipenv --rm && pipenv sync && pip list --format=freeze > requirements.txt
in the pipenv shell.
Redeploying
terraform.tf
is set up in a way that triggers a re-deployment on every execution of terraform apply .
:
The call to timestamp()
in the streamer source name triggers a re-creation of that
resource. So applying terraform is enough to re-deploy new code.
Cleanup
To remove all resources you can do one of two things:
- Delete the project with
gcloud projects delete $TF_VAR_project_id
; or - Delete only the terraform-created resources with
terraform destroy .
Python in Docker
After following this tutorial you will have a working target update stream, consuming AirSafe /v2/targets/stream from Python, in a Docker container, without timeouts, and with the ability to reconnect to the stream, shortly before a disconnection event.
In this example we log the arriving target updates of a filtered query that only shows target updates over Atlanta airport. At every position_token we receive, we write the token to the local file-system. The last token is written and then renamed to make the update operation atomic. This avoids corrupt or half-written position_tokens. In case of a disconnect the stream reconnects at the last available position_token, to avoid missing target updates.
This example is best used as a base for a persistent connection that is not expected to interrupt regularly, e.g. a stream on EC2 or Google Compute Engine. Here we do not timeout the connection, but keep it alive indefinitely, until either the server disconnects, or the consumer fails. Then the consumer can reconnect to the last known position_token to avoid missing target updates, but it will likely receive some duplicate target updates.
You can find and download the source code for this example at http://github.com/spireglobal/aviation.
First, an overview of the example.
Overview
This overview outlines the purpose of each file in this example, and which role it plays.
client.py
andclient_test.py
contains production ready sample client code that wraps the v2/targets/stream API; It exposes target updates via callbacks, and handles graceful disconnection to avoid duplicate target update delivery.main.py
usesclient.py
and manages loading and storing position_tokens, that encode the position in the stream that the client has progressed to. This is also where the TargetProcessor class is located: This class processes target updates as they come in, and exposes a callback function to do so.Pipfile
andPipfile.lock
define all Python package dependencies for the client, like the requests package to easily call network resources, and pytest for testing and development.Dockerfile
contains the Docker image definition, and specifies the command that will be called when starting a container.
The next section lists all prerequisites, with links and notes on how to install them.
Prerequisites
To execute this tutorial you need the following accounts and tools on your system:
- Docker to build the Docker image and run the Docker container;
- Git to download the source code; and
- pyenv to install and load the correct Python version, in case you want to modify it.
Having these prerequisites in place you can walk through the next section to set up the example on your account.
Setup
The first part of this section contains and describes the necessary commands to build the Docker image and run the example on your system. The second part shows how to set up the development environment, and adapt the code to your needs.
Building the Docker Image
Build and tag the Docker image
docker build . -t airsafe-2-stream-example
To build the Docker image fetch the source code from
http://github.com/spireglobal/aviation. In the terminal, navigate into the
docker
folder. In that folder, execute the docker build command.
In the next step we pass the AirSafe 2 stream token to the image, and mount a folder where the streaming process can persist position_tokens, to allow it to reconnect after potential failure.
Starting a Docker Container
Start the container locally, correctly setting the required environment variable and position_token mount point
docker run --rm -it -v $(pwd)/position_tokens:/root/position_tokens -e AIRSAFE2_TOKEN=$AIRSAFE2_TOKEN airsafe-2-stream-example
To start the container prepare as follows.
- First create a directory on the local file system to write position_tokens to, e.g. a folder
called
position_tokens
. - Secondly export the AirSafe 2 Stream token you received from Spire as an environment variable, e.g. by executing
export AIRSAFE2_TOKEN=<YOUR-AIRSAFE2-TOKEN>
.
After these preparatory steps, execute the docker run command.
Discussion of the Result
After a few seconds you should start seeing a stream of logged target updates In the terminal where you started the
Docker container. In a real system you would adapt the TargetProcessor
class to do more than just log the incoming
target updates.
In the folder position_tokens
you should see a rolling list of timestamped position_tokens. The alphabetically last
file contains the last position_token that has been sent by the stream. Looking into main.py
you can see that the
PositionTokenProcessor
first writes incoming position_tokens timestamped with a 0 value and then renames them,
which is an atomic operation on most systems. In case the write operation fails, the corrupt file will not interfere
with the restart, since the restart reads the last available position_token, and ignores the previous ones.
The stream will not disconnect, other than if the server disconnects, or something fails on the client side.
This set-up can be called from a Linux service file to enable auto-reconnection after a disconnect or failure. The image can be built locally or in CI / CD, and Ansible can deploy the service file to load the image and start the container.
Note: The first start of the container will report that it can't find a valid position_token
and starts without one
at the latest position in the stream. Following invocations will re-start from their respective previous
position_token
.
Development
The core functionality for further processing, filtering or forwarding of target updates is located
in main.py
in the class TargetProcessor
. This section shows how to modify and re-deploy that code.
For this example the target processor only logs target updates. For a real use-case the callback might add the target update to a list, to allow batch-processing after the time-out; it might forward the target update to a stream processor, or to a PubSub topic.
To modify the Python code of the GCP Cloud Function (main.py
) or the API wrapper (client.py
),
first install pyenv, then cd
into the docker
directory.
- Execute
pyenv init
and follow the instructions, to get pyenv ready to load the correct python version. - Run
pyenv install $(cat .python-version)
to install the required Python version on your system. - Run
pyenv shell $(cat .python-version)
to load this python version into the active shell. - Run
pip install pipenv
to install pipenv into the active pyenv virtual environment. - Run
pipenv --python $(cat .python-version)
to create a virtual environment for this project. - Run
pipenv shell
to load it into the active shell. - Run
pipenv sync --dev
to install development and production requirements. pipenv --venv
shows the virtual environment location, to correctly set the environment in your IDE.
Cleanup
To stop the active container, press Ctrl + C.
To remove the created Docker image, execute docker image rm airsafe-2-stream-example