Run query - Amazon Timestream (original) (raw)
Topics
When you run a query, Timestream returns the result set in a paginated manner to optimize the responsiveness of your applications. The code snippet below shows how you can paginate through the result set. You must loop through all the result set pages until you encounter a null value. Pagination tokens expire 3 hours after being issued by Timestream for LiveAnalytics.
Note
These code snippets are based on full sample applications on GitHub. For more information about how to get started with the sample applications, see Sample application.
Java
private void runQuery(String queryString) {
try {
QueryRequest queryRequest = new QueryRequest();
queryRequest.setQueryString(queryString);
QueryResult queryResult = queryClient.query(queryRequest);
while (true) {
parseQueryResult(queryResult);
if (queryResult.getNextToken() == null) {
break;
}
queryRequest.setNextToken(queryResult.getNextToken());
queryResult = queryClient.query(queryRequest);
}
} catch (Exception e) {
// Some queries might fail with 500 if the result of a sequence function has more than 10000 entries
e.printStackTrace();
}
}
Java v2
private void runQuery(String queryString) {
try {
QueryRequest queryRequest = QueryRequest.builder().queryString(queryString).build();
final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest);
for(QueryResponse queryResponse : queryResponseIterator) {
parseQueryResult(queryResponse);
}
} catch (Exception e) {
// Some queries might fail with 500 if the result of a sequence function has more than 10000 entries
e.printStackTrace();
}
}
Go
func runQuery(queryPtr *string, querySvc *timestreamquery.TimestreamQuery, f *os.File) {
queryInput := ×treamquery.QueryInput{
QueryString: aws.String(*queryPtr),
}
fmt.Println("QueryInput:")
fmt.Println(queryInput)
// execute the query
err := querySvc.QueryPages(queryInput,
func(page *timestreamquery.QueryOutput, lastPage bool) bool {
// process query response
queryStatus := page.QueryStatus
fmt.Println("Current query status:", queryStatus)
// query response metadata
// includes column names and types
metadata := page.ColumnInfo
// fmt.Println("Metadata:")
fmt.Println(metadata)
header := ""
for i := 0; i < len(metadata); i++ {
header += *metadata[i].Name
if i != len(metadata)-1 {
header += ", "
}
}
write(f, header)
// query response data
fmt.Println("Data:")
// process rows
rows := page.Rows
for i := 0; i < len(rows); i++ {
data := rows[i].Data
value := processRowType(data, metadata)
fmt.Println(value)
write(f, value)
}
fmt.Println("Number of rows:", len(page.Rows))
return true
})
if err != nil {
fmt.Println("Error:")
fmt.Println(err)
}
}
Python
def run_query(self, query_string):
try:
page_iterator = self.paginator.paginate(QueryString=query_string)
for page in page_iterator:
self._parse_query_result(page)
except Exception as err:
print("Exception while running query:", err)
Node.js
The following snippet uses the AWS SDK for JavaScript V2 style. It is based on the sample application at Node.js sample Amazon Timestream for LiveAnalytics application on GitHub.
async function getAllRows(query, nextToken) {
const params = {
QueryString: query
};
if (nextToken) {
params.NextToken = nextToken;
}
await queryClient.query(params).promise()
.then(
(response) => {
parseQueryResult(response);
if (response.NextToken) {
getAllRows(query, response.NextToken);
}
},
(err) => {
console.error("Error while querying:", err);
});
}
.NET
private async Task RunQueryAsync(string queryString)
{
try
{
QueryRequest queryRequest = new QueryRequest();
queryRequest.QueryString = queryString;
QueryResponse queryResponse = await queryClient.QueryAsync(queryRequest);
while (true)
{
ParseQueryResult(queryResponse);
if (queryResponse.NextToken == null)
{
break;
}
queryRequest.NextToken = queryResponse.NextToken;
queryResponse = await queryClient.QueryAsync(queryRequest);
}
} catch(Exception e)
{
// Some queries might fail with 500 if the result of a sequence function has more than 10000 entries
Console.WriteLine(e.ToString());
}
}
Parsing result sets
You can use the following code snippets to extract data from the result set. Query results are accessible for up to 24 hours after a query completes.
Note
These code snippets are based on full sample applications on GitHub. For more information about how to get started with the sample applications, see Sample application.
Java
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSSSSS");
private static final long ONE_GB_IN_BYTES = 1073741824L;
private void parseQueryResult(QueryResult response) {
final QueryStatus currentStatusOfQuery = queryResult.getQueryStatus();
System.out.println("Query progress so far: " + currentStatusOfQuery.getProgressPercentage() + "%");
double bytesScannedSoFar = ((double) currentStatusOfQuery.getCumulativeBytesScanned() / ONE_GB_IN_BYTES);
System.out.println("Bytes scanned so far: " + bytesScannedSoFar + " GB");
double bytesMeteredSoFar = ((double) currentStatusOfQuery.getCumulativeBytesMetered() / ONE_GB_IN_BYTES);
System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB");
List<ColumnInfo> columnInfo = response.getColumnInfo();
List<Row> rows = response.getRows();
System.out.println("Metadata: " + columnInfo);
System.out.println("Data: ");
// iterate every row
for (Row row : rows) {
System.out.println(parseRow(columnInfo, row));
}
}
private String parseRow(List<ColumnInfo> columnInfo, Row row) {
List<Datum> data = row.getData();
List<String> rowOutput = new ArrayList<>();
// iterate every column per row
for (int j = 0; j < data.size(); j++) {
ColumnInfo info = columnInfo.get(j);
Datum datum = data.get(j);
rowOutput.add(parseDatum(info, datum));
}
return String.format("{%s}", rowOutput.stream().map(Object::toString).collect(Collectors.joining(",")));
}
private String parseDatum(ColumnInfo info, Datum datum) {
if (datum.isNullValue() != null && datum.isNullValue()) {
return info.getName() + "=" + "NULL";
}
Type columnType = info.getType();
// If the column is of TimeSeries Type
if (columnType.getTimeSeriesMeasureValueColumnInfo() != null) {
return parseTimeSeries(info, datum);
}
// If the column is of Array Type
else if (columnType.getArrayColumnInfo() != null) {
List<Datum> arrayValues = datum.getArrayValue();
return info.getName() + "=" + parseArray(info.getType().getArrayColumnInfo(), arrayValues);
}
// If the column is of Row Type
else if (columnType.getRowColumnInfo() != null) {
List<ColumnInfo> rowColumnInfo = info.getType().getRowColumnInfo();
Row rowValues = datum.getRowValue();
return parseRow(rowColumnInfo, rowValues);
}
// If the column is of Scalar Type
else {
return parseScalarType(info, datum);
}
}
private String parseTimeSeries(ColumnInfo info, Datum datum) {
List<String> timeSeriesOutput = new ArrayList<>();
for (TimeSeriesDataPoint dataPoint : datum.getTimeSeriesValue()) {
timeSeriesOutput.add("{time=" + dataPoint.getTime() + ", value=" +
parseDatum(info.getType().getTimeSeriesMeasureValueColumnInfo(), dataPoint.getValue()) + "}");
}
return String.format("[%s]", timeSeriesOutput.stream().map(Object::toString).collect(Collectors.joining(",")));
}
private String parseScalarType(ColumnInfo info, Datum datum) {
switch (ScalarType.fromValue(info.getType().getScalarType())) {
case VARCHAR:
return parseColumnName(info) + datum.getScalarValue();
case BIGINT:
Long longValue = Long.valueOf(datum.getScalarValue());
return parseColumnName(info) + longValue;
case INTEGER:
Integer intValue = Integer.valueOf(datum.getScalarValue());
return parseColumnName(info) + intValue;
case BOOLEAN:
Boolean booleanValue = Boolean.valueOf(datum.getScalarValue());
return parseColumnName(info) + booleanValue;
case DOUBLE:
Double doubleValue = Double.valueOf(datum.getScalarValue());
return parseColumnName(info) + doubleValue;
case TIMESTAMP:
return parseColumnName(info) + LocalDateTime.parse(datum.getScalarValue(), TIMESTAMP_FORMATTER);
case DATE:
return parseColumnName(info) + LocalDate.parse(datum.getScalarValue(), DATE_FORMATTER);
case TIME:
return parseColumnName(info) + LocalTime.parse(datum.getScalarValue(), TIME_FORMATTER);
case INTERVAL_DAY_TO_SECOND:
case INTERVAL_YEAR_TO_MONTH:
return parseColumnName(info) + datum.getScalarValue();
case UNKNOWN:
return parseColumnName(info) + datum.getScalarValue();
default:
throw new IllegalArgumentException("Given type is not valid: " + info.getType().getScalarType());
}
}
private String parseColumnName(ColumnInfo info) {
return info.getName() == null ? "" : info.getName() + "=";
}
private String parseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) {
List<String> arrayOutput = new ArrayList<>();
for (Datum datum : arrayValues) {
arrayOutput.add(parseDatum(arrayColumnInfo, datum));
}
return String.format("[%s]", arrayOutput.stream().map(Object::toString).collect(Collectors.joining(",")));
}
Java v2
private static final long ONE_GB_IN_BYTES = 1073741824L;
private void parseQueryResult(QueryResponse response) {
final QueryStatus currentStatusOfQuery = response.queryStatus();
System.out.println("Query progress so far: " + currentStatusOfQuery.progressPercentage() + "%");
double bytesScannedSoFar = ((double) currentStatusOfQuery.cumulativeBytesScanned() / ONE_GB_IN_BYTES);
System.out.println("Bytes scanned so far: " + bytesScannedSoFar + " GB");
double bytesMeteredSoFar = ((double) currentStatusOfQuery.cumulativeBytesMetered() / ONE_GB_IN_BYTES);
System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB");
List<ColumnInfo> columnInfo = response.columnInfo();
List<Row> rows = response.rows();
System.out.println("Metadata: " + columnInfo);
System.out.println("Data: ");
// iterate every row
for (Row row : rows) {
System.out.println(parseRow(columnInfo, row));
}
}
private String parseRow(List<ColumnInfo> columnInfo, Row row) {
List<Datum> data = row.data();
List<String> rowOutput = new ArrayList<>();
// iterate every column per row
for (int j = 0; j < data.size(); j++) {
ColumnInfo info = columnInfo.get(j);
Datum datum = data.get(j);
rowOutput.add(parseDatum(info, datum));
}
return String.format("{%s}", rowOutput.stream().map(Object::toString).collect(Collectors.joining(",")));
}
private String parseDatum(ColumnInfo info, Datum datum) {
if (datum.nullValue() != null && datum.nullValue()) {
return info.name() + "=" + "NULL";
}
Type columnType = info.type();
// If the column is of TimeSeries Type
if (columnType.timeSeriesMeasureValueColumnInfo() != null) {
return parseTimeSeries(info, datum);
}
// If the column is of Array Type
else if (columnType.arrayColumnInfo() != null) {
List<Datum> arrayValues = datum.arrayValue();
return info.name() + "=" + parseArray(info.type().arrayColumnInfo(), arrayValues);
}
// If the column is of Row Type
else if (columnType.rowColumnInfo() != null && columnType.rowColumnInfo().size() > 0) {
List<ColumnInfo> rowColumnInfo = info.type().rowColumnInfo();
Row rowValues = datum.rowValue();
return parseRow(rowColumnInfo, rowValues);
}
// If the column is of Scalar Type
else {
return parseScalarType(info, datum);
}
}
private String parseTimeSeries(ColumnInfo info, Datum datum) {
List<String> timeSeriesOutput = new ArrayList<>();
for (TimeSeriesDataPoint dataPoint : datum.timeSeriesValue()) {
timeSeriesOutput.add("{time=" + dataPoint.time() + ", value=" +
parseDatum(info.type().timeSeriesMeasureValueColumnInfo(), dataPoint.value()) + "}");
}
return String.format("[%s]", timeSeriesOutput.stream().map(Object::toString).collect(Collectors.joining(",")));
}
private String parseScalarType(ColumnInfo info, Datum datum) {
return parseColumnName(info) + datum.scalarValue();
}
private String parseColumnName(ColumnInfo info) {
return info.name() == null ? "" : info.name() + "=";
}
private String parseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) {
List<String> arrayOutput = new ArrayList<>();
for (Datum datum : arrayValues) {
arrayOutput.add(parseDatum(arrayColumnInfo, datum));
}
return String.format("[%s]", arrayOutput.stream().map(Object::toString).collect(Collectors.joining(",")));
}
Go
func processScalarType(data *timestreamquery.Datum) string {
return *data.ScalarValue
}
func processTimeSeriesType(data []*timestreamquery.TimeSeriesDataPoint, columnInfo *timestreamquery.ColumnInfo) string {
value := ""
for k := 0; k < len(data); k++ {
time := data[k].Time
value += *time + ":"
if columnInfo.Type.ScalarType != nil {
value += processScalarType(data[k].Value)
} else if columnInfo.Type.ArrayColumnInfo != nil {
value += processArrayType(data[k].Value.ArrayValue, columnInfo.Type.ArrayColumnInfo)
} else if columnInfo.Type.RowColumnInfo != nil {
value += processRowType(data[k].Value.RowValue.Data, columnInfo.Type.RowColumnInfo)
} else {
fail("Bad data type")
}
if k != len(data)-1 {
value += ", "
}
}
return value
}
func processArrayType(datumList []*timestreamquery.Datum, columnInfo *timestreamquery.ColumnInfo) string {
value := ""
for k := 0; k < len(datumList); k++ {
if columnInfo.Type.ScalarType != nil {
value += processScalarType(datumList[k])
} else if columnInfo.Type.TimeSeriesMeasureValueColumnInfo != nil {
value += processTimeSeriesType(datumList[k].TimeSeriesValue, columnInfo.Type.TimeSeriesMeasureValueColumnInfo)
} else if columnInfo.Type.ArrayColumnInfo != nil {
value += "["
value += processArrayType(datumList[k].ArrayValue, columnInfo.Type.ArrayColumnInfo)
value += "]"
} else if columnInfo.Type.RowColumnInfo != nil {
value += "["
value += processRowType(datumList[k].RowValue.Data, columnInfo.Type.RowColumnInfo)
value += "]"
} else {
fail("Bad column type")
}
if k != len(datumList)-1 {
value += ", "
}
}
return value
}
func processRowType(data []*timestreamquery.Datum, metadata []*timestreamquery.ColumnInfo) string {
value := ""
for j := 0; j < len(data); j++ {
if metadata[j].Type.ScalarType != nil {
// process simple data types
value += processScalarType(data[j])
} else if metadata[j].Type.TimeSeriesMeasureValueColumnInfo != nil {
// fmt.Println("Timeseries measure value column info")
// fmt.Println(metadata[j].Type.TimeSeriesMeasureValueColumnInfo.Type)
datapointList := data[j].TimeSeriesValue
value += "["
value += processTimeSeriesType(datapointList, metadata[j].Type.TimeSeriesMeasureValueColumnInfo)
value += "]"
} else if metadata[j].Type.ArrayColumnInfo != nil {
columnInfo := metadata[j].Type.ArrayColumnInfo
// fmt.Println("Array column info")
// fmt.Println(columnInfo)
datumList := data[j].ArrayValue
value += "["
value += processArrayType(datumList, columnInfo)
value += "]"
} else if metadata[j].Type.RowColumnInfo != nil {
columnInfo := metadata[j].Type.RowColumnInfo
datumList := data[j].RowValue.Data
value += "["
value += processRowType(datumList, columnInfo)
value += "]"
} else {
panic("Bad column type")
}
// comma seperated column values
if j != len(data)-1 {
value += ", "
}
}
return value
}
Python
def _parse_query_result(self, query_result):
query_status = query_result["QueryStatus"]
progress_percentage = query_status["ProgressPercentage"]
print(f"Query progress so far: {progress_percentage}%")
bytes_scanned = float(query_status["CumulativeBytesScanned"]) / ONE_GB_IN_BYTES
print(f"Data scanned so far: {bytes_scanned} GB")
bytes_metered = float(query_status["CumulativeBytesMetered"]) / ONE_GB_IN_BYTES
print(f"Data metered so far: {bytes_metered} GB")
column_info = query_result['ColumnInfo']
print("Metadata: %s" % column_info)
print("Data: ")
for row in query_result['Rows']:
print(self._parse_row(column_info, row))
def _parse_row(self, column_info, row):
data = row['Data']
row_output = []
for j in range(len(data)):
info = column_info[j]
datum = data[j]
row_output.append(self._parse_datum(info, datum))
return "{%s}" % str(row_output)
def _parse_datum(self, info, datum):
if datum.get('NullValue', False):
return "%s=NULL" % info['Name'],
column_type = info['Type']
# If the column is of TimeSeries Type
if 'TimeSeriesMeasureValueColumnInfo' in column_type:
return self._parse_time_series(info, datum)
# If the column is of Array Type
elif 'ArrayColumnInfo' in column_type:
array_values = datum['ArrayValue']
return "%s=%s" % (info['Name'], self._parse_array(info['Type']['ArrayColumnInfo'], array_values))
# If the column is of Row Type
elif 'RowColumnInfo' in column_type:
row_column_info = info['Type']['RowColumnInfo']
row_values = datum['RowValue']
return self._parse_row(row_column_info, row_values)
# If the column is of Scalar Type
else:
return self._parse_column_name(info) + datum['ScalarValue']
def _parse_time_series(self, info, datum):
time_series_output = []
for data_point in datum['TimeSeriesValue']:
time_series_output.append("{time=%s, value=%s}"
% (data_point['Time'],
self._parse_datum(info['Type']['TimeSeriesMeasureValueColumnInfo'],
data_point['Value'])))
return "[%s]" % str(time_series_output)
def _parse_array(self, array_column_info, array_values):
array_output = []
for datum in array_values:
array_output.append(self._parse_datum(array_column_info, datum))
return "[%s]" % str(array_output)
@staticmethod
def _parse_column_name(info):
if 'Name' in info:
return info['Name'] + "="
else:
return ""
Node.js
The following snippet uses the AWS SDK for JavaScript V2 style. It is based on the sample application at Node.js sample Amazon Timestream for LiveAnalytics application on GitHub.
function parseQueryResult(response) {
const queryStatus = response.QueryStatus;
console.log("Current query status: " + JSON.stringify(queryStatus));
const columnInfo = response.ColumnInfo;
const rows = response.Rows;
console.log("Metadata: " + JSON.stringify(columnInfo));
console.log("Data: ");
rows.forEach(function (row) {
console.log(parseRow(columnInfo, row));
});
}
function parseRow(columnInfo, row) {
const data = row.Data;
const rowOutput = [];
var i;
for ( i = 0; i < data.length; i++ ) {
info = columnInfo[i];
datum = data[i];
rowOutput.push(parseDatum(info, datum));
}
return `{${rowOutput.join(", ")}}`
}
function parseDatum(info, datum) {
if (datum.NullValue != null && datum.NullValue === true) {
return `${info.Name}=NULL`;
}
const columnType = info.Type;
// If the column is of TimeSeries Type
if (columnType.TimeSeriesMeasureValueColumnInfo != null) {
return parseTimeSeries(info, datum);
}
// If the column is of Array Type
else if (columnType.ArrayColumnInfo != null) {
const arrayValues = datum.ArrayValue;
return `${info.Name}=${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`;
}
// If the column is of Row Type
else if (columnType.RowColumnInfo != null) {
const rowColumnInfo = info.Type.RowColumnInfo;
const rowValues = datum.RowValue;
return parseRow(rowColumnInfo, rowValues);
}
// If the column is of Scalar Type
else {
return parseScalarType(info, datum);
}
}
function parseTimeSeries(info, datum) {
const timeSeriesOutput = [];
datum.TimeSeriesValue.forEach(function (dataPoint) {
timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`)
});
return `[${timeSeriesOutput.join(", ")}]`
}
function parseScalarType(info, datum) {
return parseColumnName(info) + datum.ScalarValue;
}
function parseColumnName(info) {
return info.Name == null ? "" : `${info.Name}=`;
}
function parseArray(arrayColumnInfo, arrayValues) {
const arrayOutput = [];
arrayValues.forEach(function (datum) {
arrayOutput.push(parseDatum(arrayColumnInfo, datum));
});
return `[${arrayOutput.join(", ")}]`
}
.NET
private void ParseQueryResult(QueryResponse response)
{
List<ColumnInfo> columnInfo = response.ColumnInfo;
var options = new JsonSerializerOptions
{
IgnoreNullValues = true
};
List<String> columnInfoStrings = columnInfo.ConvertAll(x => JsonSerializer.Serialize(x, options));
List<Row> rows = response.Rows;
QueryStatus queryStatus = response.QueryStatus;
Console.WriteLine("Current Query status:" + JsonSerializer.Serialize(queryStatus, options));
Console.WriteLine("Metadata:" + string.Join(",", columnInfoStrings));
Console.WriteLine("Data:");
foreach (Row row in rows)
{
Console.WriteLine(ParseRow(columnInfo, row));
}
}
private string ParseRow(List<ColumnInfo> columnInfo, Row row)
{
List<Datum> data = row.Data;
List<string> rowOutput = new List<string>();
for (int j = 0; j < data.Count; j++)
{
ColumnInfo info = columnInfo[j];
Datum datum = data[j];
rowOutput.Add(ParseDatum(info, datum));
}
return $"{{{string.Join(",", rowOutput)}}}";
}
private string ParseDatum(ColumnInfo info, Datum datum)
{
if (datum.NullValue)
{
return $"{info.Name}=NULL";
}
Amazon.TimestreamQuery.Model.Type columnType = info.Type;
if (columnType.TimeSeriesMeasureValueColumnInfo != null)
{
return ParseTimeSeries(info, datum);
}
else if (columnType.ArrayColumnInfo != null)
{
List<Datum> arrayValues = datum.ArrayValue;
return $"{info.Name}={ParseArray(info.Type.ArrayColumnInfo, arrayValues)}";
}
else if (columnType.RowColumnInfo != null && columnType.RowColumnInfo.Count > 0)
{
List<ColumnInfo> rowColumnInfo = info.Type.RowColumnInfo;
Row rowValue = datum.RowValue;
return ParseRow(rowColumnInfo, rowValue);
}
else
{
return ParseScalarType(info, datum);
}
}
private string ParseTimeSeries(ColumnInfo info, Datum datum)
{
var timeseriesString = datum.TimeSeriesValue
.Select(value => $"{{time={value.Time}, value={ParseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, value.Value)}}}")
.Aggregate((current, next) => current + "," + next);
return $"[{timeseriesString}]";
}
private string ParseScalarType(ColumnInfo info, Datum datum)
{
return ParseColumnName(info) + datum.ScalarValue;
}
private string ParseColumnName(ColumnInfo info)
{
return info.Name == null ? "" : (info.Name + "=");
}
private string ParseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues)
{
return $"[{arrayValues.Select(value => ParseDatum(arrayColumnInfo, value)).Aggregate((current, next) => current + "," + next)}]";
}
Accessing the query status
You can access the query status through QueryResponse
, which contains information about progress of a query, the bytes scanned by a query and the bytes metered by a query. The bytesMetered
and bytesScanned
values are cumulative and continuously updated while paging query results. You can use this information to understand the bytes scanned by an individual query and also use it to make certain decisions. For example, assuming that the query price is 0.01perGBscanned,youmaywanttocancelqueriesthatexceed0.01 per GB scanned, you may want to cancel queries that exceed 0.01perGBscanned,youmaywanttocancelqueriesthatexceed25 per query, or X
GB. The code snippet below shows how this can be done.
Note
These code snippets are based on full sample applications on GitHub. For more information about how to get started with the sample applications, see Sample application.
Java
private static final long ONE_GB_IN_BYTES = 1073741824L;
private static final double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB
public void cancelQueryBasedOnQueryStatus() {
System.out.println("Starting query: " + SELECT_ALL_QUERY);
QueryRequest queryRequest = new QueryRequest();
queryRequest.setQueryString(SELECT_ALL_QUERY);
QueryResult queryResult = queryClient.query(queryRequest);
while (true) {
final QueryStatus currentStatusOfQuery = queryResult.getQueryStatus();
System.out.println("Query progress so far: " + currentStatusOfQuery.getProgressPercentage() + "%");
double bytesMeteredSoFar = ((double) currentStatusOfQuery.getCumulativeBytesMetered() / ONE_GB_IN_BYTES);
System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB");
// Cancel query if its costing more than 1 cent
if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) {
cancelQuery(queryResult);
break;
}
if (queryResult.getNextToken() == null) {
break;
}
queryRequest.setNextToken(queryResult.getNextToken());
queryResult = queryClient.query(queryRequest);
}
}
Java v2
private static final long ONE_GB_IN_BYTES = 1073741824L;
private static final double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB
public void cancelQueryBasedOnQueryStatus() {
System.out.println("Starting query: " + SELECT_ALL_QUERY);
QueryRequest queryRequest = QueryRequest.builder().queryString(SELECT_ALL_QUERY).build();
final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest);
for(QueryResponse queryResponse : queryResponseIterator) {
final QueryStatus currentStatusOfQuery = queryResponse.queryStatus();
System.out.println("Query progress so far: " + currentStatusOfQuery.progressPercentage() + "%");
double bytesMeteredSoFar = ((double) currentStatusOfQuery.cumulativeBytesMetered() / ONE_GB_IN_BYTES);
System.out.println("Bytes metered so far: " + bytesMeteredSoFar + "GB");
// Cancel query if its costing more than 1 cent
if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) {
cancelQuery(queryResponse);
break;
}
}
}
Go
const OneGbInBytes = 1073741824
// Assuming the price of query is $0.01 per GB
const QueryCostPerGbInDollars = 0.01
func cancelQueryBasedOnQueryStatus(queryPtr *string, querySvc *timestreamquery.TimestreamQuery, f *os.File) {
queryInput := ×treamquery.QueryInput{
QueryString: aws.String(*queryPtr),
}
fmt.Println("QueryInput:")
fmt.Println(queryInput)
// execute the query
err := querySvc.QueryPages(queryInput,
func(page *timestreamquery.QueryOutput, lastPage bool) bool {
// process query response
queryStatus := page.QueryStatus
fmt.Println("Current query status:", queryStatus)
bytes_metered := float64(*queryStatus.CumulativeBytesMetered) / float64(ONE_GB_IN_BYTES)
if bytes_metered * QUERY_COST_PER_GB_IN_DOLLARS > 0.01 {
cancelQuery(page, querySvc)
return true
}
// query response metadata
// includes column names and types
metadata := page.ColumnInfo
// fmt.Println("Metadata:")
fmt.Println(metadata)
header := ""
for i := 0; i < len(metadata); i++ {
header += *metadata[i].Name
if i != len(metadata)-1 {
header += ", "
}
}
write(f, header)
// query response data
fmt.Println("Data:")
// process rows
rows := page.Rows
for i := 0; i < len(rows); i++ {
data := rows[i].Data
value := processRowType(data, metadata)
fmt.Println(value)
write(f, value)
}
fmt.Println("Number of rows:", len(page.Rows))
return true
})
if err != nil {
fmt.Println("Error:")
fmt.Println(err)
}
}
Python
ONE_GB_IN_BYTES = 1073741824
# Assuming the price of query is $0.01 per GB
QUERY_COST_PER_GB_IN_DOLLARS = 0.01
def cancel_query_based_on_query_status(self):
try:
print("Starting query: " + self.SELECT_ALL)
page_iterator = self.paginator.paginate(QueryString=self.SELECT_ALL)
for page in page_iterator:
query_status = page["QueryStatus"]
progress_percentage = query_status["ProgressPercentage"]
print("Query progress so far: " + str(progress_percentage) + "%")
bytes_metered = query_status["CumulativeBytesMetered"] / self.ONE_GB_IN_BYTES
print("Bytes Metered so far: " + str(bytes_metered) + " GB")
if bytes_metered * self.QUERY_COST_PER_GB_IN_DOLLARS > 0.01:
self.cancel_query_for(page)
break
except Exception as err:
print("Exception while running query:", err)
traceback.print_exc(file=sys.stderr)
Node.js
The following snippet uses the AWS SDK for JavaScript V2 style. It is based on the sample application at Node.js sample Amazon Timestream for LiveAnalytics application on GitHub.
function parseQueryResult(response) {
const queryStatus = response.QueryStatus;
console.log("Current query status: " + JSON.stringify(queryStatus));
const columnInfo = response.ColumnInfo;
const rows = response.Rows;
console.log("Metadata: " + JSON.stringify(columnInfo));
console.log("Data: ");
rows.forEach(function (row) {
console.log(parseRow(columnInfo, row));
});
}
function parseRow(columnInfo, row) {
const data = row.Data;
const rowOutput = [];
var i;
for ( i = 0; i < data.length; i++ ) {
info = columnInfo[i];
datum = data[i];
rowOutput.push(parseDatum(info, datum));
}
return `{${rowOutput.join(", ")}}`
}
function parseDatum(info, datum) {
if (datum.NullValue != null && datum.NullValue === true) {
return `${info.Name}=NULL`;
}
const columnType = info.Type;
// If the column is of TimeSeries Type
if (columnType.TimeSeriesMeasureValueColumnInfo != null) {
return parseTimeSeries(info, datum);
}
// If the column is of Array Type
else if (columnType.ArrayColumnInfo != null) {
const arrayValues = datum.ArrayValue;
return `${info.Name}=${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`;
}
// If the column is of Row Type
else if (columnType.RowColumnInfo != null) {
const rowColumnInfo = info.Type.RowColumnInfo;
const rowValues = datum.RowValue;
return parseRow(rowColumnInfo, rowValues);
}
// If the column is of Scalar Type
else {
return parseScalarType(info, datum);
}
}
function parseTimeSeries(info, datum) {
const timeSeriesOutput = [];
datum.TimeSeriesValue.forEach(function (dataPoint) {
timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`)
});
return `[${timeSeriesOutput.join(", ")}]`
}
function parseScalarType(info, datum) {
return parseColumnName(info) + datum.ScalarValue;
}
function parseColumnName(info) {
return info.Name == null ? "" : `${info.Name}=`;
}
function parseArray(arrayColumnInfo, arrayValues) {
const arrayOutput = [];
arrayValues.forEach(function (datum) {
arrayOutput.push(parseDatum(arrayColumnInfo, datum));
});
return `[${arrayOutput.join(", ")}]`
}
.NET
private static readonly long ONE_GB_IN_BYTES = 1073741824L;
private static readonly double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB
private async Task CancelQueryBasedOnQueryStatus(string queryString)
{
try
{
QueryRequest queryRequest = new QueryRequest();
queryRequest.QueryString = queryString;
QueryResponse queryResponse = await queryClient.QueryAsync(queryRequest);
while (true)
{
QueryStatus queryStatus = queryResponse.QueryStatus;
double bytesMeteredSoFar = ((double) queryStatus.CumulativeBytesMetered / ONE_GB_IN_BYTES);
// Cancel query if its costing more than 1 cent
if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01)
{
await CancelQuery(queryResponse);
break;
}
ParseQueryResult(queryResponse);
if (queryResponse.NextToken == null)
{
break;
}
queryRequest.NextToken = queryResponse.NextToken;
queryResponse = await queryClient.QueryAsync(queryRequest);
}
} catch(Exception e)
{
// Some queries might fail with 500 if the result of a sequence function has more than 10000 entries
Console.WriteLine(e.ToString());
}
}
For additional details on how to cancel a query, see Cancel query.