Data Pipeline API
The Data Pipeline API allows Space Services Customers to download data from their payloads as well as receive data uploaded to their payloads. The API is exposed through the Spire Linux Agent.
Version: 1.0
Example
Payload Code
from oort_sdk_client import SdkApi
from oort_sdk_client.models import (
SendFileRequest, SendOptions, TTLParams
)
from custom_application import (
observe, process, save_to_file, create_logs
)
# these topic names will be provided by the Spire Constellation Ops team
topic_primary = "custom-application"
topic_logs = "custom-application-logs"
topic_raw = "custom-application-raw"
agent = SdkApi()
while True:
raw_observation = observe()
# on-board processing may be done to extract the most important data
processed_observation = process(raw_observation)
raw_filename = save_to_file(raw_observation)
processed_filename = save_to_file(processed_observation)
# send the important processed data with default options
req = SendFileRequest(
destination="ground",
topic=topic_primary,
filepath=processed_filename,
options=SendOptions())
resp = agent.send_file(req)
# logfiles may be very useful, but not as critical as the important
# data observations. Send those as "bulk" data
# The hypothetical "create_logs" method would write any log files
# in progress, and return a list of their filenames.
log_files = create_logs()
ttl = TTLParams(urgent=0, bulk=86400)
options = SendOptions(ttl_params=ttl)
for file in log_files:
req = SendFileRequest(
destination="ground",
topic=topic_logs,
filepath=file,
options=options
)
agent.send_file(req)
# the raw data may be much larger, but still useful if there is
# time to transmit it. This data can be sent as "surplus"
ttl = TTLParams(urgent=0, bulk=0, surplus=(86400 * 7))
options = SendOptions(ttl_params=ttl)
req = SendFileRequest(
destination="ground",
topic=topic_raw,
filepath=raw_filename,
options=options
)
agent.send_file(req)
On the satellite, the producer will typically perform some observation, post-process the recorded data (e.g., to remove duplicate data points or run custom compression), and then send the file.
The data pipeline will use a general-purpose data compressor (e.g., gzip) to compress data that is not already compressed; so it is not always necessary; however a compressor that has been tuned to the specific data is likely to perform better.
Ground-side
After the data pipeline has transferred sent files to the ground, they are stored in S3 buckets, where they can be retrieved for further processing. The files will be delivered in the original format they were sent in, so if they were sent compressed, they will be stored compressed.
The topic the file was sent to determines the specific S3 bucket that a file will be uploaded to.
Version: 1.0
Data Structures
TTLParams
see SendFileRequest for usage example
OORT has three queues for any data topic: urgent, bulk, and surplus. Data in the urgent queue stage is transmitted first, followed by data in the bulk queue stage, followed by data in the surplus queue stage.
The developer may set a file's Time-to-live (TTL) for each stage individually. For lower priority data it is possible to skip the urgent stage and add the data directly to bulk, or skip both the urgent and bulk stages and add directly to surplus.
Time-to-live (TTL) parameters for a sent item are described below.
Members
Unless otherwise specified, all time values are in seconds.
Name | Type | Description | Default |
---|---|---|---|
urgent | int | TTL for urgent queue -- data here is transmitted first, and moved to bulk when TTL expires | 9000 (2.5 hours) |
bulk | int | TTL for bulk queue -- data here is transmitted second, and moved to surplus when TTL expires | 43200 (12 hours) |
surplus | int | TTL for surplus queue -- data here is transmitted third, and deleted when TTL expires | 172800 (48 hours) |
Data in all three queues is transmitted First In First Out (FIFO). Below is a guideline for how to determine the appropriate TTL values for a given piece of data your payload is producing: 1. Do you want this data type to skip the line and get downloaded ahead of the rest of your data? If no set urgent TTL to zero. If yes set urgent TTL to the number of seconds you want the skipping to be in effect. 2. Does this data at some point become stale, i.e. substantially less valuable to you? If no, you don't need to set it, the default value will be applied. If yes, set bulk TTL to the number of seconds after which you consider it stale. 3. Once this data becomes stale do you still want to try to download it if there’s remaining downlink budget for your payload? If yes set surplus TTL to the number of seconds you wish to continue trying before the data is deleted.
SendOptions
see SendFileRequest for usage example
Options to apply to a send request
Members
Name | Type | Description | Default |
---|---|---|---|
TTLParams | TTLParams | time-to-live parameters | see TTLParams |
reliable | boolean | whether to send an item reliable, i.e., with retries | true |
SendFileRequest
#include "SdkAPI.h"
ttl_params_t *ttl;
send_options_t *send_options;
send_file_request_t *request;
/* for TTL Params */
int urgent = 9000;
int bulk = 43200;
int surplus = 172800;
/* for SendOptions */
int reliable = 1;
/* for SendFileRequest */
/* NB: all the strings are owned by the reqeust object and will be freed
with it so they must be copied if they come from storage that cannot
be freed, like static strings or argv.
*/
char *dest = strdup("ground");
char *topic = strdup("logs");
char *filename = strdup("/file/to/send");
ttl = ttl_params_create(urgent, bulk, surplus);
send_options = send_options_create(ttl, reliable);
request = send_file_request_create(dest, filename, topic, send_options);
/* ... use request ... */
send_file_request_free(request); /* frees all child structures */
from oort_sdk_client.models import (
TTLParams, SendOptions, SendFileRequest
)
ttl = TTLParams(urgent=9000, bulk=43200, surplus=172800)
send_options = SendOptions(ttl_params=ttl, reliable=True)
request = SendFileRequest(destination='ground', topic='logs',
filepath='/file/to/send',
options=send_options))
# ... use request ...
A request to send a file via the Data Pipeline API
Members
Name | Type | Description | Default |
---|---|---|---|
destination | string | the destination to send the file to | - |
filepath | string | The source file path. Must be absolute | - |
topic | string | The pipeline topic to send the file to | - |
options | SendOptions | The options to apply | see SendOptions |
SendFileResponse
send_file_response_t *response;
/* create apiClient_t *client, send_file_request *request */
response = SdkAPI_sendFile(client, request);
if (response != NULL) {
fprintf(stderr, "The file was sent with uuid %s\n", response->uuid);
send_file_response_free(response);
}
# create SdkApi agent, SendFileRequest request
response = agent.send_file(request)
print('The file was sent with uuid {}'.format(response.uuid))
The response received from a send file request.
Members
Name | Type | Description | Default |
---|---|---|---|
UUID | string | the unique identifier for the file transfer | - |
FileInfo
see RetrieveFileRequest for example
Information about the file and the transfer request.
Members
Times are unix epoch times (i.e., seconds since Jan 1, 1970)
Name | Type | Description | Default |
---|---|---|---|
id | string | the UUID for the file transfer | - |
path | string | the original path this file was sent as | - |
size | integer | the original size of this file | - |
modified | integer | the time this file was most recently modfied | - |
created | integer | the time this file was created | - |
crc32 | string | CRC-32 calculated for the file | - |
AvailableFilesResponse
see RetrieveFileRequest for example
Response to available files query
Members
Name | Type | Description | Default |
---|---|---|---|
files | FileInfo[] | list of the available files | - |
overflow | boolean | true if there are mote files available than could be returned in a single call | - |
RetrieveFileRequest
available_files_response_t *avail;
file_info_t *finfo;
retrieve_file_request_t *request;
char *topic;
topic = strdup("mytopic");
avail = SdkAPI_queryAvailableFiles(client, topic);
char *destdir = "/destination/directory";
if (avail != NULL) {
listEntry_t *item;
fprintf(stderr, "Available files:\n");
/* provided helper macros */
list_ForEach(item, avail->files) {
file_info_t *finfo = (file_info_t *)item->data;
file_info_t *finfo2;
char *destpath;
size_t pathlen;
fprintf(stderr, "%s: %s\n", finfo->id, finfo->path);
pathlen = snprintf(NULL, 0, "%s/%s", destdir, basename(finfo->path));
destpath = (char *)malloc(pathlen + 1);
pathlen = snprintf(destpath, pathlen, "%s/%s", destdir, basename(finfo->path));
request = retrieve_file_request_create(strdup(finfo->id), destpath);
finfo2 = SdkAPI_retrieveFile(client, request);
fprintf(stderr, "retrieved %s\n", finfo2->id);
file_info_free(finfo2);
file_info_free(finfo);
retrieve_file_request_free(request);
}
available_files_response_free(avail);
}
from oort_sdk_client import SdkApi
from oort_sdk_client.models import FileInfo, RetrieveFileRequest
available = agent.query_available_files(thetopic).files
if not available:
print("No files available")
else:
for f in available:
print("Retrieving {}".format(f.id))
req = RetrieveFileRequest(
id=f.id,
save_path="/tmp/{}".format(os.path.basename(f.path)))
rfinfo = agent.retrieve_file(req)
print("Retrieved {}".format(rfinfo.id))
Request to retrieve a file from the Data Pipeline API
Members
Name | Type | Description | Default |
---|---|---|---|
id | string | the UUID for the file transfer | - |
save_path | string | The absolute path for the file to be saved to. | - |
ErrorResponse
Any error returned from the Data Pipeline API.
Members
Name | Type | Description | Default |
---|---|---|---|
status | integer | status code for the error | - |
message | string | The error message | - |
Version: 1.0
Methods
Instantiation
from oort_sdk_client import SdkApi
agent = SdkApi()
#include "SdkAPI.h"
apiClient_t *client;
client = apiClient_create();
/* ... */
apiClient_free(client);
SdkAPI
Create a new Spire Linux Agent Client.
SendFile
Sends a file from the payload to the ground.
from oort_sdk_client.models import SendFileRequest, RetrieveFileRequest
req = SendFileRequest(
destination="ground",
topic="my-topic",
filepath="/path/to/file")
resp = agent.send_file(req)
print("File sent successfully, UUID is {}".format(resp.uuid)))
send_file_request_t *req;
send_options_t *send_opt;
send_file_response_t *resp;
ttl_params_t *ttl;
ttl = ttl_params_create(urgent, bulk, surplus);
send_opt = send_options_create(ttl, reliable);
req = send_file_request_create(dest, filename, topic, send_opt);
resp = SdkAPI_sendFile(client, req);
if (resp != NULL) {
fprintf(stderr, "resp uuid = %s", resp->uuid);
send_file_response_free(resp);
}
send_file_request_free(req);
Send a file via the API.
Arguments
Type | Description |
---|---|
SendFileRequest | SendFileRequest Object |
Return value
Type | Description |
---|---|
SendFileResponse | Contains the UUID assigned for this file transfer |
QueryAvailableFiles
Queries files that have been uplinked from the ground to the payload
from oort_sdk_client import SdkApi
agent = SdkApi()
topic = "my-topic"
available = agent.query_available_files(topic)
for item in available.files:
print("Available file: {item.id} -- {item.path}".format(item=item))
char *topic = NULL;
apiClient_t *client;
available_files_response_t *resp;
topic = strdup(argv[1]);
client = apiClient_create();
resp = SdkAPI_queryAvailableFiles(client, topic);
if (resp != NULL) {
listEntry_t *item;
fprintf(stderr, "Available files:\n");
list_ForEach(item, resp->files) {
file_info_t *finfo = (file_info_t *)item->data;
fprintf(stderr, "%s: %s\n", finfo->id, finfo->path);
}
available_files_response_free(resp);
}
apiClient_free(client);
Query the Data Pipeline API for any files ready to be retrieved.
Arguments
Type | Description |
---|---|
string | The topic to check for available files. |
Return value
Type | Description |
---|---|
AvailableFilesResponse | AvailableFilesResponse Object |
RetrieveFile
Retrieve a file returned from QueryAvailableFiles
import os.path
from oort_sdk_client import SdkApi
from oort_sdk_client.models import RetrieveFileRequest
agent = SdkApi()
topic = "my-topic"
available = agent.query_available_files(topic)
if not available.files:
print("No files available for retreival")
else:
for item in available.files:
print("Retrieving {id} ({path})".format(item.id, item.path))
req = RetrieveFileRequest(
id=item.id,
save_path='/tmp/{}'.format(os.path.basename(item.path)))
agent.retrieve_file(req)
char *filename = NULL, *id = NULL;
apiClient_t *client;
retrieve_file_request_t *req;
file_info_t *finfo;
if (argc != 3) {
usage(argv[0]);
}
id = strdup(argv[1]);
filename = strdup(argv[2]);
client = apiClient_create();
req = retrieve_file_request_create(id, filename);
finfo = SdkAPI_retrieveFile(client, req);
if (finfo != NULL) {
fprintf(stderr, "retrieved id = %s to %s",
req->id, finfo->path);
file_info_free(finfo);
}
retrieve_file_request_free(req);
apiClient_free(client);
Retrieve an available file from the Data Pipeline API.
Arguments
Type | Description |
---|---|
RetrieveFileRequest | RetrieveFileRequest Object |
Return value
Type | Description |
---|---|
FileInfo | Details about the file retrieved |
Errors
from oort_sdk_client import SdkApi, ApiException, ErrorResponse
from oort_sdk_client.models import SendFileRequest, RetrieveFileRequest
import sys
agent = SdkApi()
req = SendFileRequest(
destination="ground",
topic=sys.argv[1],
filepath=sys.argv[2])
try:
res = agent.send_file(req)
print(res.uuid)
except ApiException as e:
print("Error sending file: {}".format(e.body))
Errors are returned as a ErrorResponse value, which contains an error code and a message.