NAV
c python

Data Pipeline API

The Data Pipeline API allows Space Services Customers to download data from their payloads as well as receive data uploaded to their payloads. The API is exposed through the Spire Linux Agent.

Version: 1.0

Example

Payload Code

from oort_sdk_client import SdkApi
from oort_sdk_client.models import (
    SendFileRequest, SendOptions, TTLParams
)

from custom_application import (
    observe, process, save_to_file, create_logs
)

# these topic names will be provided by the Spire Constellation Ops team
topic_primary = "custom-application"
topic_logs = "custom-application-logs"
topic_raw = "custom-application-raw"

agent = SdkApi()

while True:
    raw_observation = observe()

    # on-board processing may be done to extract the most important data
    processed_observation = process(raw_observation)

    raw_filename = save_to_file(raw_observation)
    processed_filename = save_to_file(processed_observation)

    # send the important processed data with default options
    req = SendFileRequest(
        destination="ground",
        topic=topic_primary,
        filepath=processed_filename,
        options=SendOptions())
    resp = agent.send_file(req)

    # logfiles may be very useful, but not as critical as the important 
    # data observations.  Send those as "bulk" data

    # The hypothetical "create_logs" method would write any log files
    # in progress, and return a list of their filenames.

    log_files = create_logs()
    ttl = TTLParams(urgent=0, bulk=86400)
    options = SendOptions(ttl_params=ttl)
    for file in log_files:
        req = SendFileRequest(
            destination="ground",
            topic=topic_logs,
            filepath=file,
            options=options
            )
        agent.send_file(req)

    # the raw data may be much larger, but still useful if there is 
    # time to transmit it.  This data can be sent as "surplus" 
    ttl = TTLParams(urgent=0, bulk=0, surplus=(86400 * 7))
    options = SendOptions(ttl_params=ttl)
    req = SendFileRequest(
        destination="ground",
        topic=topic_raw,
        filepath=raw_filename,
        options=options
        )
    agent.send_file(req)

On the satellite, the producer will typically perform some observation, post-process the recorded data (e.g., to remove duplicate data points or run custom compression), and then send the file.

The data pipeline will use a general-purpose data compressor (e.g., gzip) to compress data that is not already compressed; so it is not always necessary; however a compressor that has been tuned to the specific data is likely to perform better.

Ground-side

After the data pipeline has transferred sent files to the ground, they are stored in S3 buckets, where they can be retrieved for further processing. The files will be delivered in the original format they were sent in, so if they were sent compressed, they will be stored compressed.

The topic the file was sent to determines the specific S3 bucket that a file will be uploaded to.

Version: 1.0

Data Structures

TTLParams

see SendFileRequest for usage example

OORT has three queues for any data topic: urgent, bulk, and surplus. Data in the urgent queue stage is transmitted first, followed by data in the bulk queue stage, followed by data in the surplus queue stage.

The developer may set a file's Time-to-live (TTL) for each stage individually. For lower priority data it is possible to skip the urgent stage and add the data directly to bulk, or skip both the urgent and bulk stages and add directly to surplus.

Time-to-live (TTL) parameters for a sent item are described below.

Members

Unless otherwise specified, all time values are in seconds.

Name Type Description Default
urgent int TTL for urgent queue -- data here is transmitted first, and moved to bulk when TTL expires 9000 (2.5 hours)
bulk int TTL for bulk queue -- data here is transmitted second, and moved to surplus when TTL expires 43200 (12 hours)
surplus int TTL for surplus queue -- data here is transmitted third, and deleted when TTL expires 172800 (48 hours)

Data in all three queues is transmitted First In First Out (FIFO). Below is a guideline for how to determine the appropriate TTL values for a given piece of data your payload is producing: 1. Do you want this data type to skip the line and get downloaded ahead of the rest of your data? If no set urgent TTL to zero. If yes set urgent TTL to the number of seconds you want the skipping to be in effect. 2. Does this data at some point become stale, i.e. substantially less valuable to you? If no, you don't need to set it, the default value will be applied. If yes, set bulk TTL to the number of seconds after which you consider it stale. 3. Once this data becomes stale do you still want to try to download it if there’s remaining downlink budget for your payload? If yes set surplus TTL to the number of seconds you wish to continue trying before the data is deleted.

SendOptions

see SendFileRequest for usage example

Options to apply to a send request

Members

Name Type Description Default
TTLParams TTLParams time-to-live parameters see TTLParams
reliable boolean whether to send an item reliable, i.e., with retries true

SendFileRequest

#include "SdkAPI.h"

ttl_params_t *ttl;
send_options_t *send_options;
send_file_request_t *request;

/* for TTL Params */
int urgent = 9000;
int bulk = 43200;
int surplus = 172800;
/* for SendOptions */
int reliable = 1;
/* for SendFileRequest */
/* NB: all the strings are owned by the reqeust object and will be freed
   with it so they must be copied if they come from storage that cannot
   be freed, like static strings or argv.
 */

char *dest = strdup("ground");
char *topic = strdup("logs");
char *filename = strdup("/file/to/send");

ttl = ttl_params_create(urgent, bulk, surplus);
send_options = send_options_create(ttl, reliable);
request = send_file_request_create(dest, filename, topic, send_options);

/* ... use request ... */

send_file_request_free(request);  /* frees all child structures */
from oort_sdk_client.models import (
    TTLParams, SendOptions, SendFileRequest
)


ttl = TTLParams(urgent=9000, bulk=43200, surplus=172800)
send_options = SendOptions(ttl_params=ttl, reliable=True)
request = SendFileRequest(destination='ground', topic='logs',
                          filepath='/file/to/send',
                          options=send_options))

# ... use request ...

A request to send a file via the Data Pipeline API

Members

Name Type Description Default
destination string the destination to send the file to -
filepath string The source file path. Must be absolute -
topic string The pipeline topic to send the file to -
options SendOptions The options to apply see SendOptions

SendFileResponse


send_file_response_t *response;

/* create apiClient_t *client, send_file_request *request */

response = SdkAPI_sendFile(client, request);

if (response != NULL) {
    fprintf(stderr, "The file was sent with uuid %s\n", response->uuid);
    send_file_response_free(response);
}


# create SdkApi agent, SendFileRequest request

response = agent.send_file(request)
print('The file was sent with uuid {}'.format(response.uuid))

The response received from a send file request.

Members

Name Type Description Default
UUID string the unique identifier for the file transfer -

FileInfo

see RetrieveFileRequest for example

Information about the file and the transfer request.

Members

Times are unix epoch times (i.e., seconds since Jan 1, 1970)

Name Type Description Default
id string the UUID for the file transfer -
path string the original path this file was sent as -
size integer the original size of this file -
modified integer the time this file was most recently modfied -
created integer the time this file was created -
crc32 string CRC-32 calculated for the file -

AvailableFilesResponse

see RetrieveFileRequest for example

Response to available files query

Members

Name Type Description Default
files FileInfo[] list of the available files -
overflow boolean true if there are mote files available than could be returned in a single call -

RetrieveFileRequest


available_files_response_t *avail;
file_info_t *finfo;
retrieve_file_request_t *request;

char *topic;

topic = strdup("mytopic");

avail = SdkAPI_queryAvailableFiles(client, topic);

char *destdir = "/destination/directory";

if (avail != NULL) {
    listEntry_t *item;
    fprintf(stderr, "Available files:\n");
    /* provided helper macros */
    list_ForEach(item, avail->files) {
        file_info_t *finfo = (file_info_t *)item->data;
        file_info_t *finfo2;
        char *destpath;
        size_t pathlen;
        fprintf(stderr, "%s: %s\n", finfo->id, finfo->path);
        pathlen = snprintf(NULL, 0, "%s/%s", destdir, basename(finfo->path));
        destpath = (char *)malloc(pathlen + 1);
        pathlen = snprintf(destpath, pathlen, "%s/%s", destdir, basename(finfo->path));
        request = retrieve_file_request_create(strdup(finfo->id), destpath);
        finfo2 = SdkAPI_retrieveFile(client, request);
        fprintf(stderr, "retrieved %s\n", finfo2->id);
        file_info_free(finfo2);
        file_info_free(finfo);
        retrieve_file_request_free(request);
    }
    available_files_response_free(avail);
}
from oort_sdk_client import SdkApi
from oort_sdk_client.models import FileInfo, RetrieveFileRequest

available = agent.query_available_files(thetopic).files
if not available:
    print("No files available")
else:
    for f in available:
        print("Retrieving {}".format(f.id))
        req = RetrieveFileRequest(
            id=f.id,
            save_path="/tmp/{}".format(os.path.basename(f.path)))
        rfinfo = agent.retrieve_file(req)
        print("Retrieved {}".format(rfinfo.id))

Request to retrieve a file from the Data Pipeline API

Members

Name Type Description Default
id string the UUID for the file transfer -
save_path string The absolute path for the file to be saved to. -

ErrorResponse

Any error returned from the Data Pipeline API.

Members

Name Type Description Default
status integer status code for the error -
message string The error message -

Version: 1.0

Methods

Instantiation

from oort_sdk_client import SdkApi

agent = SdkApi()
    #include "SdkAPI.h"

    apiClient_t *client;

    client = apiClient_create();

    /* ... */

    apiClient_free(client);

SdkAPI

Create a new Spire Linux Agent Client.

SendFile

Sends a file from the payload to the ground.

from oort_sdk_client.models import SendFileRequest, RetrieveFileRequest

req = SendFileRequest(
    destination="ground",
    topic="my-topic",
    filepath="/path/to/file")
resp = agent.send_file(req)

print("File sent successfully, UUID is {}".format(resp.uuid)))
    send_file_request_t *req;
    send_options_t *send_opt;
    send_file_response_t *resp;
    ttl_params_t *ttl;

    ttl = ttl_params_create(urgent, bulk, surplus);
    send_opt = send_options_create(ttl, reliable);

    req = send_file_request_create(dest, filename, topic, send_opt);

    resp = SdkAPI_sendFile(client, req);
    if (resp != NULL) {
        fprintf(stderr, "resp uuid = %s", resp->uuid);
        send_file_response_free(resp);
    }

    send_file_request_free(req);

Send a file via the API.

Arguments

Type Description
SendFileRequest SendFileRequest Object

Return value

Type Description
SendFileResponse Contains the UUID assigned for this file transfer

QueryAvailableFiles

Queries files that have been uplinked from the ground to the payload

from oort_sdk_client import SdkApi

agent = SdkApi()

topic = "my-topic"

available = agent.query_available_files(topic)

for item in available.files:
    print("Available file: {item.id} -- {item.path}".format(item=item))
    char *topic = NULL;
    apiClient_t *client;
    available_files_response_t *resp;

    topic = strdup(argv[1]);
    client = apiClient_create();

    resp = SdkAPI_queryAvailableFiles(client, topic);
    if (resp != NULL) {
        listEntry_t *item;
        fprintf(stderr, "Available files:\n");
        list_ForEach(item, resp->files) {
            file_info_t *finfo = (file_info_t *)item->data;
            fprintf(stderr, "%s: %s\n", finfo->id, finfo->path);
        }
        available_files_response_free(resp);
    }

    apiClient_free(client);

Query the Data Pipeline API for any files ready to be retrieved.

Arguments

Type Description
string The topic to check for available files.

Return value

Type Description
AvailableFilesResponse AvailableFilesResponse Object

RetrieveFile

Retrieve a file returned from QueryAvailableFiles

import os.path

from oort_sdk_client import SdkApi
from oort_sdk_client.models import RetrieveFileRequest

agent = SdkApi()

topic = "my-topic"

available = agent.query_available_files(topic)
if not available.files:
    print("No files available for retreival")
else:
    for item in available.files:
        print("Retrieving {id} ({path})".format(item.id, item.path))
        req = RetrieveFileRequest(
            id=item.id,
            save_path='/tmp/{}'.format(os.path.basename(item.path)))
        agent.retrieve_file(req)
    char *filename = NULL, *id = NULL;
    apiClient_t *client;
    retrieve_file_request_t *req;
    file_info_t *finfo;

    if (argc != 3) {
        usage(argv[0]);
    }
    id = strdup(argv[1]);
    filename = strdup(argv[2]);

    client = apiClient_create();

    req = retrieve_file_request_create(id, filename);

    finfo = SdkAPI_retrieveFile(client, req);
    if (finfo != NULL) {
        fprintf(stderr, "retrieved id = %s to %s",
               req->id, finfo->path);
        file_info_free(finfo);
    }

    retrieve_file_request_free(req);
    apiClient_free(client);

Retrieve an available file from the Data Pipeline API.

Arguments

Type Description
RetrieveFileRequest RetrieveFileRequest Object

Return value

Type Description
FileInfo Details about the file retrieved

Errors

from oort_sdk_client import SdkApi, ApiException, ErrorResponse
from oort_sdk_client.models import SendFileRequest, RetrieveFileRequest

import sys

agent = SdkApi()

req = SendFileRequest(
    destination="ground",
    topic=sys.argv[1],
    filepath=sys.argv[2])

try:
    res = agent.send_file(req)
    print(res.uuid)
except ApiException as e:
    print("Error sending file: {}".format(e.body))

Errors are returned as a ErrorResponse value, which contains an error code and a message.