# BCO Persistence

This section introduces how InfluxDB and the BCO Influxdb Connector app can be used to store the history of unit service state changes. This can for example be useful to compute and monitor the current economy level of the smart environment.

# How to setup InfluxDB

Get started with InfluxDB v2.0 to collect your data.
After you setup your initial user, bucket and organization you are able to use the database.

INFO

Please choose bco-persistence as bucket name and openbase as organization name to simplify this setup.

# How to setup the BCO Influxdb Connector App.

The BCO Influxdb Connector is a BCO app which stores all unit changes into the influxdb.

# 1. Register the new App at the UnitRegistry

To install the InfluxDbConnector you need to register it by using the bco-registry-editor. So please make sure you are connected to your BCO instance and start the bco-registry-editor. Than, you need to navigate to: UnitRegistry → App

add_unit

Now add a new unit with right click → Add

To add the InfluxDB connector class to the new unit, select InfluxDB Connector as AppClassId and press apply.

add_unit_class

# 2. Authenticate and Configure the App via Meta Configs

Next, you have to setup a authentication token in order to be able to store new data into influxdb. Therefore lookup the token via the Chronograf interface (default: http://localhost:9999 ). You will find your tokens here: influxd_token Then copy the token and past it into a new MetaConfig entry of the BCO Influxdb Connector via the bco-registry-editor e.g. INFLUXDB_TOKEN = PASTE_TOKEN_HERE

INFO

In case you choose the default values during the influxdb setup and you run influxdb on the same host as influxdb is running, all values except INFLUXDB_TOKEN are optionally.

Further configurable meta config entries are:

  • INFLUXDB_URL → Url of your InfluxDB
    DEFAULT: INFLUXDB_URL = http://localhost:9999
  • INFLUXDB_BUCKET → Name of the bucket where your data will be stored
    DEFAULT: INFLUXDB_BUCKET = bco-persistence
  • INFLUXDB_BATCH_TIME → Time limit(ms) after your batch is written to the database
    DEFAULT: INFLUXDB_BATCH_TIME = 1000
  • INFLUXDB_BATCH_LIMIT → Max size of your batch
    DEFAULT: INFLUXDB_BATCH_LIMIT = 100
  • INFLUXDB_ORG → Org for the bucket
    DEFAULT: INFLUXDB_ORG = openbase
  • INFLUXDB_TOKEN → Token with read and write access to your database

# How to query influx db.

InfluxDB 2.0 uses Flux as a functional data scripting language. A good guide how to get started with Flux is provided by the official Influxdb Documentation.

# How to create a Query

Chronograf is the user interface and administrative component of the InfluxDB platform. It is already included in influxdb 2.0. With Chronograf you can quickly see your data and build dashboards.

Therefore, you need to log in into the Chronograf webview and select the Data Explorer.

If you have run bco-test --simulate and collected some data in your bucket, you should see some measurements. query_data

This query selects from the measurement power_consumption_state_service the field consumption data from the tag alias PowerConsumptionSensor-11.
It creates this query in Flux (you can see the query when you select the 'Script Editor'): flux-query

There are more options to visualize the data like raw_data, histogram table etc. You can also save your graphs into dashboards.

If you want know about the possibilities of chronograf you can have a look at the official documentation here Chronograf Documentation

# Heartbeat

The database contains a measurement 'heartbeat' with the field 'alive'. If the influxdb connector app is started, the value one is written into this field. Every 15 minutes the value one is written into the field again. When the app is closed a zero value is written into the field. This can be used to check when the database was functional and stored data. So you can consider possible downtimes during queries and calculations.

Source Code

# Service Aggregation

It is possible to perform a service aggregation via the function queryAggregatedServiceState of a unit. This function needs a QueryType as a parameter. Important attributes of the QueryType for the service aggregation are:

  • measurement
  • service type
  • time_range_stop
  • time_range_stop
  • aggregation_window

To get an overview of the QueryType look here: Query

The method returns an AggregatedServiceState. Which contains the service_type, the query and the aggregated_service_type. Depending on whether the requested service_type is an enum or not, the aggregated_service_type consists of percentages of how often which status was active, or of the average values.

// Query for continuous data
Query query = Query.newBuilder()
     .setMeasurement("power_consumption_state_service")
     .setServiceType(ServiceTemplateType.ServiceTemplate.ServiceType.POWER_CONSUMPTION_STATE_SERVICE)
     .setTimeRangeStart(TimestampType.Timestamp.newBuilder().setTime(time - 3600).build())
     .setTimeRangeStop(TimestampType.Timestamp.newBuilder().setTime(time).build())
     .setAggregatedWindow("1m")
     .build();
AggregatedServiceStateType.AggregatedServiceState aggregatedServiceState = testLocation.queryAggregatedServiceState(query).get();
1
2
3
4
5
6
7
8
9
// Query for enum data
Query enumQuery = Query.newBuilder()
     .setMeasurement("button_state_service")
     .setServiceType(ServiceTemplateType.ServiceTemplate.ServiceType.BUTTON_STATE_SERVICE)
     .setTimeRangeStart(TimestampType.Timestamp.newBuilder().setTime(time - 3600).build())
     .setTimeRangeStop(TimestampType.Timestamp.newBuilder().setTime(time).build())
     .setAggregatedWindow("1m")
     .build();

AggregatedServiceStateType.AggregatedServiceState aggregatedEnumServiceState = testLocation.queryAggregatedServiceState(enumQuery).ge();
1
2
3
4
5
6
7
8
9
10

The full example how to query an aggregated service state is available over here: HowToQueryAggregatedState

# Query Database

You can also send raw queries to the database via the units with the queryRecord function. This function needs also a QueryType as a parameter. However, the only attribute that must be filled is the raw_query. The method returns an RecordCollection which consists of Records. In the Chronograf and the Query-Section it is explained how a raw query looks like and how it can be built.

An example request looks like:

String query = "from(bucket: \"bco-persistence\")\n" +
     "  |> range(start:" + (time - 3600) + ", stop: " + time + ")\n" +
     "  |> filter(fn: (r) => r._measurement == \"power_consumption_state_service\")";
RecordCollectionType.RecordCollection recordCollection = testLocation.queryRecord(Query.newBuilder().setRawQuery(query).build()).get();
1
2
3
4

The full example how to query a database record is available over here: HowToQueryUnitLongTermStateUpdates.

Last Updated: 10/17/2019, 11:31:58 AM