Sources and Measurements Example
This example shows how to upload measurements from an external repository into the MultiViz Analytics Engine (MVG) service and how to read this data.
[1]:
import json
import os
from pathlib import Path
from tqdm.notebook import tqdm
import pandas as pd
# import mvg library with python bindings to mvg-API
from mvg import MVG
from mvg.exceptions import MVGAPIError
Note
Each TOKEN
is used both for authorization and authentication. Thus, each unique token represents a unique user and each user has their own unique database on the VA-MVG’ service.
You need to insert your token received from Viking Analytics here: Just replace "os.environ['TEST_TOKEN']"
by your token as a string.
[2]:
ENDPOINT = "https://api.beta.multiviz.com"
# Replace by your own Token
VALID_TOKEN = os.environ['TEST_TOKEN']
Downloading the Data
We pick the data from one source from our public charlie repo https://github.com/vikinganalytics/va-data-charlie.git for convenience. Clone that repository to get access to the data.
[ ]:
!git clone https://github.com/vikinganalytics/va-data-charlie.git
We are going to use six of the sources from the charlie dataset with IDs u0001
to u0006
.
[3]:
REF_DB_PATH = Path.cwd() / "va-data-charlie" / "charlieDb" / "acc"
SOURCE_IDS = ["u0001", "u0002", "u0003", "u0004", "u0005", "u0006"]
Connect to the API
Instantiate a session object with MVG library. A session object basically catches the endpoint and the token, to simplify the calls to the MVG library.
[4]:
session = MVG(ENDPOINT, VALID_TOKEN)
We now check if the server is alive. The hello message contains, amongst others the API version.
[5]:
session.say_hello()
[5]:
{'name': 'MultiViz Engine API',
'version': 'v0.3.2',
'swagger': 'http://api.beta.multiviz.com/docs'}
Check Database
We start by seeing if there are any sources in the database. A source represents a measurement source e.g. a measurement sensor. Note that sources can be used to represent a sensor with several channels.
Because we want to start at a clean slate we will list all sources (potentially none) and delete all of them
[6]:
sources = session.list_sources()
for src in sources:
# While the list returned contains all information
# about all known sources, it is also possible
# to query for a single source by its id
s_info = session.get_source(src['source_id'])
print(f"Source info retrieved:\n{s_info}\n")
Source info retrieved:
{'source_id': 'u0001', 'meta': {'assetId': 'assetJ', 'measPoint': 'mloc01', 'location': 'cancun', 'timezone': 'Europe/Stockholm'}, 'properties': {'data_class': 'waveform', 'channels': ['acc']}}
The example below revolves around a source with source id u0001
[7]:
SOURCE_ID = SOURCE_IDS[0]
To make sure we start from a clean slate we delete our source in case it exists.
[8]:
try:
source = session.get_source(SOURCE_ID)
session.delete_source(SOURCE_ID)
print(f"Source {SOURCE_ID} deleted")
except MVGAPIError:
print(f"Source {SOURCE_ID} does not exist")
Source u0001 deleted
Build measurements
Now we want to (re) build the source and the attached measurements from scratch. In this example, we have a json file with all the information needed to create each source.
[9]:
src_path = REF_DB_PATH / SOURCE_ID
m_file_name = REF_DB_PATH / SOURCE_ID / "meta.json"
with open(m_file_name, "r") as json_file:
meta = json.load(json_file)
print("Creating meta info")
print(meta)
Creating meta info
{'assetId': 'assetA', 'measPoint': 'mloc01', 'location': 'paris'}
Create the source and check for it.
[10]:
session.create_source(SOURCE_ID, channels=["acc"], meta=meta) # create
source = session.get_source(SOURCE_ID)
print("Recreated source info:")
print(source)
Recreated source info:
{'source_id': 'u0001', 'meta': {'assetId': 'assetA', 'measPoint': 'mloc01', 'location': 'paris'}, 'properties': {'data_class': 'waveform', 'channels': ['acc']}}
Update the source in case it is necessary.
[11]:
meta['updated'] = "YES! I have been updated"
session.update_source(SOURCE_ID, meta) # update
source = session.get_source(SOURCE_ID)
print("Updated source info")
print(source)
Updated source info
{'source_id': 'u0001', 'meta': {'assetId': 'assetA', 'measPoint': 'mloc01', 'location': 'paris', 'updated': 'YES! I have been updated'}, 'properties': {'data_class': 'waveform', 'channels': ['acc']}}
Upload measurements to source. Measurements are tied to sources, they consist of
an array of floating point values with a header indicating the channel name
timestamp when the values were sampled
a field for the duration of the measurement
meta information to be stored along the measurement
[12]:
meas = [f.stem for f in Path(src_path).glob("*.csv")]
meas
now contains a list of timestamps representing the measurements in our repo we upload from. We proceed to iterate over all of them.
[13]:
print(f"Uploading measurements to source {SOURCE_ID}")
for m in tqdm(meas):
# samples file for one measurement
TS_MEAS = str(m) + ".csv" # filename
TS_MEAS = REF_DB_PATH / SOURCE_ID / TS_MEAS # path to file
ts_df = pd.read_csv(TS_MEAS) # read csv into df
accs = ts_df.iloc[:, 0].tolist() # convert to list
# meta information file for one measurement
TS_META = str(m) + ".json" # filename
TS_META = REF_DB_PATH / SOURCE_ID / TS_META # path
with open(TS_META, "r") as json_file: # read json
meas_info = json.load(json_file) # into dict
# get duration and other meta info
duration = meas_info['duration']
meta_info = meas_info['meta']
# add sampling rate, not required by vibration API
# but may be used on client side
# in general any information can be stored
# along the actual samples
meta_info['sampling_rate'] = len(accs)/duration
# <<< end of code specific for repo
try:
# see mvg for details on this call
session.create_measurement(sid=SOURCE_ID,
duration=duration,
timestamp=int(m),
data={"acc": accs},
meta=meta_info)
except MVGAPIError as exc:
print(exc)
Uploading measurements to source u0001
Read the Measurements
Check if we actually created the measurements by reading them.
[14]:
m = session.list_measurements(SOURCE_ID)
print(f"Read {len(m)} stored measurements")
Read 50 stored measurements
It is also possible to read a specific measurement at a single timestamp. Let’s get the timestamp of the first measurement.
[15]:
ts_0 = m[0]['timestamp']
meas_0 = session.read_single_measurement(SOURCE_ID, ts_0)
# we'll printout the returned measurement
print(f"source_id: {SOURCE_ID}")
print(f"timestamp: {ts_0}")
print(f"duration: {meas_0['duration']}")
print(f"meta: {meas_0['meta']}")
print(f"data: {meas_0['data']['acc'][1:3]}...")
source_id: u0001
timestamp: 1570186860
duration: 2.8672073400507907
meta: {'sampling_rate': 13950.857142857141}
data: [0.63897705078125, -0.55078125]...
We can update the meta information for a measurement at a single timestamp. Let’s get the timestamp of the first measurement.
[16]:
ts_0 = m[0]['timestamp']
new_meta = meas_0['meta']
new_meta['updated'] = "YES!"
session.update_measurement(SOURCE_ID, ts_0, new_meta)
meas_0_u = session.read_single_measurement(SOURCE_ID, ts_0)
# we'll printout the returned measurement
print(f"source_id: {SOURCE_ID}")
print(f"timestamp: {ts_0}")
print(f"duration: {meas_0_u['duration']}")
print(f"meta: {meas_0_u['meta']}")
print(f"data: {meas_0_u['data']['acc'][1:3]}...")
source_id: u0001
timestamp: 1570186860
duration: 2.8672073400507907
meta: {'sampling_rate': 13950.857142857141, 'updated': 'YES!'}
data: [0.63897705078125, -0.55078125]...
Finally, we delete the measurement.
[18]:
session.delete_measurement(SOURCE_ID, ts_0)
We check if it’s actually deleted
[17]:
try:
meas_0 = session.read_single_measurement(SOURCE_ID, ts_0)
except MVGAPIError:
print("Previously deleted measurement does not exist")
If all went well we end up here now. The source u0001 is in the database along with a number of its measurements.
Uploading additional sources
Here we show how to upload additional sources in a more compact way.
[19]:
sources = SOURCE_IDS[1:]
print(f"Uploading sources {', '.join(sources)} from CharlieDb")
for source_id in tqdm(sources):
try:
session.delete_source(source_id)
except MVGAPIError as exc:
pass # Source didnt exist
print(f"Creating source {source_id}")
src_path = REF_DB_PATH / source_id
m_file_name = REF_DB_PATH / source_id / "meta.json"
with open(m_file_name, "r") as json_file:
meta = json.load(json_file)
session.create_source(source_id, channels=["acc"], meta=meta) # create
meas = [f.stem for f in Path(src_path).glob("*.csv")]
print(f"Uploading measurements to source {source_id}")
for m in tqdm(meas):
# samples file for one measurement
TS_MEAS = str(m) + ".csv" # filename
TS_MEAS = REF_DB_PATH / source_id / TS_MEAS # path to file
ts_df = pd.read_csv(TS_MEAS) # read csv into df
accs = ts_df.iloc[:, 0].tolist() # convert to list
# meta information file for one measurement
TS_META = str(m) + ".json" # filename
TS_META = REF_DB_PATH / source_id / TS_META # path
with open(TS_META, "r") as json_file: # read json
meas_info = json.load(json_file) # into dict
duration = meas_info['duration']
meta_info = meas_info['meta']
meta_info['sampling_rate'] = len(accs)/duration
try:
# see mvg for details on this call
session.create_measurement(sid=source_id,
duration=duration,
timestamp=int(m),
data={"acc": accs},
meta=meta_info)
except MVGAPIError as exc:
print(exc)
raise
Uploading sources u0002, u0003, u0004, u0005, u0006 from CharlieDb
Creating source u0002
Uploading measurements to source u0002
Creating source u0003
Uploading measurements to source u0003
Creating source u0004
Uploading measurements to source u0004
Creating source u0005
Uploading measurements to source u0005
Creating source u0006
Uploading measurements to source u0006