Welcome to pydruid’s documentation! — pydruid 0.3.1 documentation (original) (raw)

pydruid exposes a simple API to create, execute, and analyze Druid queries. pydruid can parse query results into Pandas DataFrame objects for subsequent data analysis, which offers a tight integration between Druid, the SciPy stack (for scientific computing) and scikit-learn (for machine learning). Additionally, pydruid can export query results into TSV or JSON for further processing with your favorite tool, e.g., R, Julia, Matlab, or Excel.

Below is a reference for the PyDruid class, describing the functions to use for querying and exporting, complete with examples. For additional examples, see the pydruid README.

pydruid.client module

class pydruid.client. BaseDruidClient(url, endpoint)

Bases: object

export_pandas()

Export the current query result to a Pandas DataFrame object.

Deprecated since version Use: Query.export_pandas() method instead

export_tsv(dest_path)

Export the current query result to a tsv file.

Deprecated since version Use: Query.export_tsv() method instead.

groupby(**kwargs)

A group-by query groups a results set (the requested aggregate metrics) by the specified dimension(s).

Required key/value pairs:

Parameters: datasource (str) – Data source to query granularity (str) – Time bucket to aggregate data by hour, day, minute, etc., intervals (str or list) – ISO-8601 intervals for which to run the query on aggregations (dict) – A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum dimensions (list) – The dimensions to group by
Returns: The query result
Return type: Query

Optional key/value pairs:

Parameters: filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query having (pydruid.utils.having.Having) – Indicates which groups in results set of query to keep post_aggregations – A dict with string key = ‘post_aggregator_name’, and value pydruid.utils.PostAggregator context (dict) – A dict of query context options limit_spec (dict) – A dict of parameters defining how to limit the rows returned, as specified in the Druid api documentation

Example:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 >>> group = client.groupby( datasource='twitterstream', granularity='hour', intervals='2013-10-04/pt1h', dimensions=["user_name", "reply_to_name"], filter=~(Dimension("reply_to_name") == "Not A Reply"), aggregations={"count": doublesum("count")}, context={"timeout": 1000} limit_spec={ "type": "default", "limit": 50, "columns" : ["count"] } ) >>> for k in range(2): ... print group[k] >>> {'timestamp': '2013-10-04T00:00:00.000Z', 'version': 'v1', 'event': {'count': 1.0, 'user_name': 'user_1', 'reply_to_name': 'user_2'}} >>> {'timestamp': '2013-10-04T00:00:00.000Z', 'version': 'v1', 'event': {'count': 1.0, 'user_name': 'user_2', 'reply_to_name': 'user_3'}}

segment_metadata(**kwargs)

A segment meta-data query returns per segment information about:

Required key/value pairs:

Parameters: datasource (str) – Data source to query intervals (str or list) – ISO-8601 intervals for which to run the query on

Optional key/value pairs:

Parameters: context (dict) – A dict of query context options
Returns: The query result
Return type: Query

Example:

>>> meta = client.segment_metadata(datasource='twitterstream', intervals = '2013-10-04/pt1h') >>> print meta[0].keys() >>> ['intervals', 'id', 'columns', 'size'] >>> print meta[0]['columns']['tweet_length'] >>> {'errorMessage': None, 'cardinality': None, 'type': 'FLOAT', 'size': 30908008}

select(**kwargs)

A select query returns raw Druid rows and supports pagination.

Required key/value pairs:

Parameters: datasource (str) – Data source to query granularity (str) – Time bucket to aggregate data by hour, day, minute, etc. paging_spec (dict) – Indicates offsets into different scanned segments intervals (str or list) – ISO-8601 intervals for which to run the query on

Optional key/value pairs:

Parameters: filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query dimensions (list) – The list of dimensions to select. If left empty, all dimensions are returned metrics (list) – The list of metrics to select. If left empty, all metrics are returned context (dict) – A dict of query context options
Returns: The query result
Return type: Query

Example:

>>> raw_data = client.select( datasource=twitterstream, granularity='all', intervals='2013-06-14/pt1h', paging_spec={'pagingIdentifies': {}, 'threshold': 1}, context={"timeout": 1000} ) >>> print raw_data >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'pagingIdentifiers': {'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1': 1, 'events': [{'segmentId': 'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1', 'offset': 0, 'event': {'timestamp': '2013-06-14T00:00:00.000Z', 'dim': 'value'}}]}}]

time_boundary(**kwargs)

A time boundary query returns the min and max timestamps present in a data source.

Required key/value pairs:

Parameters: datasource (str) – Data source to query

Optional key/value pairs:

Parameters: context (dict) – A dict of query context options
Returns: The query result
Return type: Query

Example:

>>> bound = client.time_boundary(datasource='twitterstream') >>> print bound >>> [{'timestamp': '2011-09-14T15:00:00.000Z', 'result': {'minTime': '2011-09-14T15:00:00.000Z', 'maxTime': '2014-03-04T23:44:00.000Z'}}]

timeseries(**kwargs)

A timeseries query returns the values of the requested metrics (in aggregate) for each timestamp.

Required key/value pairs:

Parameters: datasource (str) – Data source to query granularity (str) – Time bucket to aggregate data by hour, day, minute, etc., intervals (str or list) – ISO-8601 intervals for which to run the query on aggregations (dict) – A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum
Returns: The query result
Return type: Query

Optional key/value pairs:

Parameters: filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query post_aggregations – A dict with string key = ‘post_aggregator_name’, and value pydruid.utils.PostAggregator context (dict) – A dict of query context options

Example:

>>> counts = client.timeseries( datasource=twitterstream, granularity='hour', intervals='2013-06-14/pt1h', aggregations={"count": doublesum("count"), "rows": count("rows")}, post_aggregations={'percent': (Field('count') / Field('rows')) * Const(100))}, context={"timeout": 1000} ) >>> print counts >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'count': 9619.0, 'rows': 8007, 'percent': 120.13238416385663}}]

topn(**kwargs)

A TopN query returns a set of the values in a given dimension, sorted by a specified metric. Conceptually, a topN can be thought of as an approximate GroupByQuery over a single dimension with an Ordering spec. TopNs are faster and more resource efficient than GroupBy for this use case.

Required key/value pairs:

Parameters: datasource (str) – Data source to query granularity (str) – Aggregate data by hour, day, minute, etc., intervals (str or list) – ISO-8601 intervals of data to query aggregations (dict) – A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum dimension (str) – Dimension to run the query against metric (str) – Metric over which to sort the specified dimension by threshold (int) – How many of the top items to return
Returns: The query result
Return type: Query

Optional key/value pairs:

Parameters: filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query post_aggregations – A dict with string key = ‘post_aggregator_name’, and value pydruid.utils.PostAggregator context (dict) – A dict of query context options

Example:

1 2 3 4 5 6 7 8 9 10 11 12 13 >>> top = client.topn( datasource='twitterstream', granularity='all', intervals='2013-06-14/pt1h', aggregations={"count": doublesum("count")}, dimension='user_name', metric='count', filter=Dimension('user_lang') == 'en', threshold=1, context={"timeout": 1000} ) >>> print top >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': [{'count': 22.0, 'user': "cool_user"}}]}]

class pydruid.client. PyDruid(url, endpoint)

Bases: pydruid.client.BaseDruidClient

PyDruid contains the functions for creating and executing Druid queries. Returns Query objects that can be used for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis.

Parameters: url (str) – URL of Broker node in the Druid cluster endpoint (str) – Endpoint that Broker listens for queries on

Example

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 >>> from pydruid.client import * >>> query = PyDruid('http://localhost:8083', 'druid/v2/') >>> top = query.topn( datasource='twitterstream', granularity='all', intervals='2013-10-04/pt1h', aggregations={"count": doublesum("count")}, dimension='user_name', filter = Dimension('user_lang') == 'en', metric='count', threshold=2 ) >>> print json.dumps(top.query_dict, indent=2) >>> { "metric": "count", "aggregations": [ { "type": "doubleSum", "fieldName": "count", "name": "count" } ], "dimension": "user_name", "filter": { "type": "selector", "dimension": "user_lang", "value": "en" }, "intervals": "2013-10-04/pt1h", "dataSource": "twitterstream", "granularity": "all", "threshold": 2, "queryType": "topN" } >>> print top.result >>> [{'timestamp': '2013-10-04T00:00:00.000Z', 'result': [{'count': 7.0, 'user_name': 'user_1'}, {'count': 6.0, 'user_name': 'user_2'}]}] >>> df = top.export_pandas() >>> print df >>> count timestamp user_name 0 7 2013-10-04T00:00:00.000Z user_1 1 6 2013-10-04T00:00:00.000Z user_2

pydruid.async_client module

class pydruid.async_client. AsyncPyDruid(url, endpoint)

Bases: pydruid.client.BaseDruidClient

Asynchronous PyDruid client which mirrors functionality of the synchronous PyDruid, but it executes queries asynchronously (using an asynchronous http client from Tornado framework).

Returns Query objects that can be used for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis.

Parameters: url (str) – URL of Broker node in the Druid cluster endpoint (str) – Endpoint that Broker listens for queries on

Example

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 >>> from pydruid.async_client import * >>> query = AsyncPyDruid('http://localhost:8083', 'druid/v2/') >>> top = yield query.topn( datasource='twitterstream', granularity='all', intervals='2013-10-04/pt1h', aggregations={"count": doublesum("count")}, dimension='user_name', filter = Dimension('user_lang') == 'en', metric='count', threshold=2 ) >>> print json.dumps(top.query_dict, indent=2) >>> { "metric": "count", "aggregations": [ { "type": "doubleSum", "fieldName": "count", "name": "count" } ], "dimension": "user_name", "filter": { "type": "selector", "dimension": "user_lang", "value": "en" }, "intervals": "2013-10-04/pt1h", "dataSource": "twitterstream", "granularity": "all", "threshold": 2, "queryType": "topN" } >>> print top.result >>> [{'timestamp': '2013-10-04T00:00:00.000Z', 'result': [{'count': 7.0, 'user_name': 'user_1'}, {'count': 6.0, 'user_name': 'user_2'}]}] >>> df = top.export_pandas() >>> print df >>> count timestamp user_name 0 7 2013-10-04T00:00:00.000Z user_1 1 6 2013-10-04T00:00:00.000Z user_2

groupby(**kwargs)

segment_metadata(**kwargs)

select(**kwargs)

time_boundary(**kwargs)

timeseries(**kwargs)

topn(**kwargs)

pydruid.query module

class pydruid.query. Query(query_dict, query_type)

Bases: collections.abc.MutableSequence

Query objects are produced by PyDruid clients and can be used for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis. They also hold information about the issued query.

Query acts as a wrapper over raw result list of dictionaries.

Variables: result_json (str) – JSON object representing a query result. Initial value: None result (list) – Query result parsed into a list of dicts. Initial value: None query_type (str) – Name of most recently run query, e.g., topN. Initial value: None query_dict (dict) – JSON object representing the query. Initial value: None

export_pandas()

Export the current query result to a Pandas DataFrame object.

Returns: The DataFrame representing the query result
Return type: DataFrame
Raises: NotImplementedError

Example

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 >>> top = client.topn( datasource='twitterstream', granularity='all', intervals='2013-10-04/pt1h', aggregations={"count": doublesum("count")}, dimension='user_name', filter = Dimension('user_lang') == 'en', metric='count', threshold=2 ) >>> df = top.export_pandas() >>> print df >>> count timestamp user_name 0 7 2013-10-04T00:00:00.000Z user_1 1 6 2013-10-04T00:00:00.000Z user_2

export_tsv(dest_path)

Export the current query result to a tsv file.

Parameters: dest_path (str) – file to write query results to
Raises: NotImplementedError

Example

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 >>> top = client.topn( datasource='twitterstream', granularity='all', intervals='2013-10-04/pt1h', aggregations={"count": doublesum("count")}, dimension='user_name', filter = Dimension('user_lang') == 'en', metric='count', threshold=2 ) >>> top.export_tsv('top.tsv') >>> !cat top.tsv >>> count user_name timestamp 7.0 user_1 2013-10-04T00:00:00.000Z 6.0 user_2 2013-10-04T00:00:00.000Z

insert(index, value)

parse(data)

class pydruid.query. QueryBuilder

Bases: object

build_query(query_type, args)

Build query based on given query type and arguments.

Parameters: query_type (string) – a type of query args (dict) – the dict of args to be sent
Returns: the resulting query
Return type: Query

groupby(args)

A group-by query groups a results set (the requested aggregate metrics) by the specified dimension(s).

Parameters: args (dict) – dict of args
Returns: group by query
Return type: Query

search(args)

A search query returns dimension values that match the search specification.

Parameters: args (dict) – dict of args
Returns: search query
Return type: Query

segment_metadata(args)

Parameters: args (dict) – dict of args
Returns: segment metadata query
Return type: Query

select(args)

A select query returns raw Druid rows and supports pagination.

Parameters: args (dict) – dict of args
Returns: select query
Return type: Query

time_boundary(args)

A time boundary query returns the min and max timestamps present in a data source.

Parameters: args (dict) – dict of args
Returns: time boundary query
Return type: Query

timeseries(args)

A timeseries query returns the values of the requested metrics (in aggregate) for each timestamp.

Parameters: args (dict) – dict of args
Returns: timeseries query
Return type: Query

topn(args)

A TopN query returns a set of the values in a given dimension, sorted by a specified metric. Conceptually, a topN can be thought of as an approximate GroupByQuery over a single dimension with an Ordering spec. TopNs are faster and more resource efficient than GroupBy for this use case.

Parameters: args (dict) – dict of arguments
Returns: topn query
Return type: Query

static validate_query(query_type, valid_parts, args)

Validate the query parts so only allowed objects are sent.

Each query type can have an optional ‘context’ object attached which is used to set certain query context settings, etc. timeout or priority. As each query can have this object, there’s no need for it to be sent - it might as well be added here.

Parameters: query_type (string) – a type of query valid_parts (list) – a list of valid object names args (dict) – the dict of args to be sent
Raises: ValueError – if an invalid object is given

pydruid.utils.aggregators module

pydruid.utils.aggregators. build_aggregators(agg_input)

pydruid.utils.aggregators. cardinality(raw_column, by_row=False)

pydruid.utils.aggregators. count(raw_metric)

pydruid.utils.aggregators. doublesum(raw_metric)

pydruid.utils.aggregators. filtered(filter, agg)

pydruid.utils.aggregators. hyperunique(raw_metric)

pydruid.utils.aggregators. javascript(columns_list, fn_aggregate, fn_combine, fn_reset)

pydruid.utils.aggregators. longsum(raw_metric)

pydruid.utils.aggregators. max(raw_metric)

pydruid.utils.aggregators. min(raw_metric)

pydruid.utils.dimensions module

Bases: pydruid.utils.dimensions.ExtractionFunction

class pydruid.utils.dimensions. DimensionSpec(dimension, output_name, extraction_function=None)

Bases: object

build()

Bases: object

Bases: pydruid.utils.dimensions.ExtractionFunction

Bases: pydruid.utils.dimensions.ExtractionFunction

Bases: pydruid.utils.dimensions.LookupExtraction

Bases: pydruid.utils.dimensions.LookupExtraction

Bases: pydruid.utils.dimensions.BaseRegexExtraction

Bases: pydruid.utils.dimensions.BaseRegexExtraction

Bases: pydruid.utils.dimensions.ExtractionFunction

pydruid.utils.dimensions. build_dimension(dim)

pydruid.utils.filters module

class pydruid.utils.filters. Bound(dimension, lower, upper, lowerStrict=False, upperStrict=False, alphaNumeric=False)

Bases: pydruid.utils.filters.Filter

Bound filter can be used to filter by comparing dimension values to an upper value or/and a lower value.

Variables: dimension (str) – Dimension to filter on. lower (str) – Lower bound. upper (str) – Upper bound. lowerStrict (bool) – Strict lower inclusion. Initial value: False upperStrict (bool) – Strict upper inclusion. Initial value: False alphaNumeric (bool) – Numeric comparison. Initial value: False

class pydruid.utils.filters. Dimension(dim)

Bases: object

class pydruid.utils.filters. Filter(**args)

Bases: object

static build_filter(filter_obj)

show()

class pydruid.utils.filters. JavaScript(dim)

Bases: object

pydruid.utils.having module

class pydruid.utils.having. Aggregation(agg)

Bases: object

class pydruid.utils.having. Having(**args)

Bases: object

static build_having(having_obj)

show()

pydruid.utils.postaggregator module

class pydruid.utils.postaggregator. Const(value, output_name=None)

Bases: pydruid.utils.postaggregator.Postaggregator

class pydruid.utils.postaggregator. Field(name)

Bases: pydruid.utils.postaggregator.Postaggregator

class pydruid.utils.postaggregator. HyperUniqueCardinality(name)

Bases: pydruid.utils.postaggregator.Postaggregator

class pydruid.utils.postaggregator. Postaggregator(fn, fields, name)

Bases: object

static build_post_aggregators(postaggs)

fields(other)

class pydruid.utils.postaggregator. Quantile(name, probability)

Bases: pydruid.utils.postaggregator.Postaggregator

class pydruid.utils.postaggregator. Quantiles(name, probabilities)

Bases: pydruid.utils.postaggregator.Postaggregator