Commit 79648d29 authored by Pietro Tosato's avatar Pietro Tosato
Browse files

TB first implementation

parent 3b8fbc33
import requests
import json
from datetime import datetime, timedelta
from requests.compat import urljoin
import pandas as pd
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
base_url = "https://thingsboard.raptorbox.eu:443/"
my_headers = {}
session = requests.Session()
def login(username: str, password: str):
"""
setup the connection to TB, i.e. retrieve the access token.
Args:
username (str): Thingsboard username
password (str): Thingsboard password
"""
# prepare HTTP call
my_headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
# post request for token
response = requests.post('https://thingsboard.raptorbox.eu/api/auth/login',
data = json.dumps({'username': username, 'password': password}),
headers = my_headers)
my_token = response.json()['token']
session.headers.update({'X-Authorization': 'Bearer ' + my_token})
# update header
my_headers = {'Accept': 'application/json'}
def get_dev_id(device_name: str) -> str:
"""
Get the device id of a device, this is a *tenant API*, do not work for customer users
Args:
device_name (str): name of the device in TB
Returns:
str: device id
"""
response = session.get(url='https://thingsboard.raptorbox.eu:443/api/tenant/devices',
params={'deviceName': device_name},
headers=my_headers)
response_data = json.loads(response._content)
# print(json.dumps( response.json(), indent=3) )
return (response_data['id']['id'])
def get_dev_keys(device_id: str) -> str:
"""
Get device' keys
Args:
device_id (str): id of the device
Returns:
str: array of keys available
"""
# setup url
device_api_url = "api/plugins/telemetry/DEVICE/"
device_api_url_keysval = "/keys/timeseries"
request_url = urljoin(base_url, device_api_url+device_id+device_api_url_keysval)
response = session.get(url=request_url, headers=my_headers)
response_data = json.loads(response._content)
return (response_data)
def get_ts_data(start_day: int, end_day: int, device_id: str, keys: list, aggregation_interval: int, verbose=False) -> dict:
"""
Query TB for timeseries data.
Args:
start_day (int): initial timestamp (unix)
end_day (int): end timestamp (unix)
device_id (str): device
keys (list): list of timeseries keys
aggregation_interval (int): moving average interval (seconds)
verbose (bool, optional): print out info. Defaults to False.
Returns:
dict: json as dict
"""
# Thingsboard need timestamp in ms
time_ms_start = int(start_day*1000)
time_ms_end = int(end_day*1000)
data_len = (time_ms_end-time_ms_start)/aggregation_interval/1000*len(keys)
# prepare parameters
query_params = {'interval': aggregation_interval*1000, # in ms
'limit': int(data_len),
'agg': 'AVG', # MIN, MAX, AVG, SUM, COUNT, NONE
'orderBy': 'DESC',
'useStrictDataTypes': True,
'keys': keys,
'startTs': str(time_ms_start),
'endTs': str(time_ms_end)}
# debug print
if verbose:
for key in keys:
print( "ask for " + str(int((time_ms_end-time_ms_start)/aggregation_interval/1000)) + " entries for " + str(key))
# setup url
device_api_url = "api/plugins/telemetry/DEVICE/"
device_api_url_tsval = "/values/timeseries"
request_url = urljoin(base_url, device_api_url+device_id+device_api_url_tsval)
response = session.get(url=request_url, params=query_params, headers=my_headers)
if (response.status_code != 200):
# raise Exception('BAD HTTP request, maybe the length of data is too long')
print('BAD HTTP request, maybe the length of data is too long')
else:
try:
for key in keys:
print( "got " + str(len(response.json()[key])) + " entries for " + str(key)) if verbose else 0
except KeyError:
print(f"{bcolors.WARNING}JSON error in response, maybe that day something was wrong...{bcolors.ENDC}")
return json.loads(response._content)
def flatten_json(input_data: dict, keys: list, verbose=False) -> pd.DataFrame:
"""
Get input data as dict and return a flattened JSON as pandas dataframe
Args:
input_data (dict): JSON containing data
keys (list): list of keys of the dictionary
verbose (bool, optional): print out info. Defaults to False.
Raises:
ValueError: ValueError in case the data is not aligned
Returns:
pd.DataFrame: same data
"""
# check for length of data:
for key in keys:
if (len(input_data[key]) != len(input_data[keys[0]])):
# print("data length mismatch!")
print(f"{bcolors.WARNING}data length mismatch!{bcolors.ENDC}")
# raise ValueError('Wrong size of data!')
data = pd.DataFrame({ 'ts': [0]})
# flattening and merging
for key in keys:
temp = pd.json_normalize(input_data, record_path = key )
print('merge ' + str(temp.shape) + ' elements of ' + key) if verbose else 0
data = pd.merge(data, temp, on='ts', how='right')
# fix axis label
# data.drop('value_y', axis=1, inplace=True)
data.set_axis(['ts', *keys], axis=1, inplace=True)
# set datetime to first column
data['ts'] = pd.to_datetime( data['ts'], unit='ms', origin='unix')
return data
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment