Sources and Measurements Example

This example shows how to upload measurements from an external repository into the MultiViz Analytics Engine (MVG) service and how to read this data.

[1]:
import json
import os
from pathlib import Path

from tqdm.notebook import tqdm
import pandas as pd

# import mvg library with python bindings to mvg-API
from mvg import MVG
from mvg.exceptions import MVGAPIError

Note

Each TOKEN is used both for authorization and authentication. Thus, each unique token represents a unique user and each user has their own unique database on the VA-MVG’ service.

You need to insert your token received from Viking Analytics here: Just replace "os.environ['TEST_TOKEN']" by your token as a string.

[2]:
ENDPOINT = "https://api.beta.multiviz.com"
# Replace by your own Token
VALID_TOKEN = os.environ['TEST_TOKEN']

Downloading the Data

We pick the data from one source from our public charlie repo https://github.com/vikinganalytics/va-data-charlie.git for convenience. Clone that repository to get access to the data.

[ ]:
!git clone https://github.com/vikinganalytics/va-data-charlie.git

We are going to use six of the sources from the charlie dataset with IDs u0001 to u0006.

[3]:
REF_DB_PATH = Path.cwd() / "va-data-charlie" / "charlieDb" / "acc"
SOURCE_IDS = ["u0001", "u0002", "u0003", "u0004", "u0005", "u0006"]

Connect to the API

Instantiate a session object with MVG library. A session object basically catches the endpoint and the token, to simplify the calls to the MVG library.

[4]:
session = MVG(ENDPOINT, VALID_TOKEN)

We now check if the server is alive. The hello message contains, amongst others the API version.

[5]:
session.say_hello()
[5]:
{'name': 'MultiViz Engine API',
 'version': 'v0.3.2',
 'swagger': 'http://api.beta.multiviz.com/docs'}

Check Database

We start by seeing if there are any sources in the database. A source represents a measurement source e.g. a measurement sensor. Note that sources can be used to represent a sensor with several channels.

Because we want to start at a clean slate we will list all sources (potentially none) and delete all of them

[6]:
sources = session.list_sources()

for src in sources:
    # While the list returned contains all information
    # about all known sources, it is also possible
    # to query for a single source by its id
    s_info = session.get_source(src['source_id'])
    print(f"Source info retrieved:\n{s_info}\n")
Source info retrieved:
{'source_id': 'u0001', 'meta': {'assetId': 'assetJ', 'measPoint': 'mloc01', 'location': 'cancun', 'timezone': 'Europe/Stockholm'}, 'properties': {'data_class': 'waveform', 'channels': ['acc']}}

The example below revolves around a source with source id u0001

[7]:
SOURCE_ID = SOURCE_IDS[0]

To make sure we start from a clean slate we delete our source in case it exists.

[8]:
try:
    source = session.get_source(SOURCE_ID)
    session.delete_source(SOURCE_ID)
    print(f"Source {SOURCE_ID} deleted")
except MVGAPIError:
    print(f"Source {SOURCE_ID} does not exist")
Source u0001 deleted

Build measurements

Now we want to (re) build the source and the attached measurements from scratch. In this example, we have a json file with all the information needed to create each source.

[9]:
src_path = REF_DB_PATH / SOURCE_ID
m_file_name = REF_DB_PATH / SOURCE_ID / "meta.json"
with open(m_file_name, "r") as json_file:
    meta = json.load(json_file)
print("Creating meta info")
print(meta)
Creating meta info
{'assetId': 'assetA', 'measPoint': 'mloc01', 'location': 'paris'}

Create the source and check for it.

[10]:
session.create_source(SOURCE_ID, channels=["acc"], meta=meta)  # create
source = session.get_source(SOURCE_ID)
print("Recreated source info:")
print(source)
Recreated source info:
{'source_id': 'u0001', 'meta': {'assetId': 'assetA', 'measPoint': 'mloc01', 'location': 'paris'}, 'properties': {'data_class': 'waveform', 'channels': ['acc']}}

Update the source in case it is necessary.

[11]:
meta['updated'] = "YES! I have been updated"
session.update_source(SOURCE_ID, meta)  # update
source = session.get_source(SOURCE_ID)
print("Updated source info")
print(source)
Updated source info
{'source_id': 'u0001', 'meta': {'assetId': 'assetA', 'measPoint': 'mloc01', 'location': 'paris', 'updated': 'YES! I have been updated'}, 'properties': {'data_class': 'waveform', 'channels': ['acc']}}

Upload measurements to source. Measurements are tied to sources, they consist of

  • an array of floating point values with a header indicating the channel name

  • timestamp when the values were sampled

  • a field for the duration of the measurement

  • meta information to be stored along the measurement

[12]:
meas = [f.stem for f in Path(src_path).glob("*.csv")]

meas now contains a list of timestamps representing the measurements in our repo we upload from. We proceed to iterate over all of them.

[13]:
print(f"Uploading measurements to source {SOURCE_ID}")
for m in tqdm(meas):

    # samples file for one measurement
    TS_MEAS = str(m) + ".csv"  # filename
    TS_MEAS = REF_DB_PATH / SOURCE_ID / TS_MEAS  # path to file
    ts_df = pd.read_csv(TS_MEAS)  # read csv into df
    accs = ts_df.iloc[:, 0].tolist()  # convert to list

    # meta information file for one measurement
    TS_META = str(m) + ".json"  # filename
    TS_META = REF_DB_PATH / SOURCE_ID / TS_META  # path
    with open(TS_META, "r") as json_file:  # read json
        meas_info = json.load(json_file)  # into dict

    # get duration and other meta info
    duration = meas_info['duration']
    meta_info = meas_info['meta']
    # add sampling rate, not required by vibration API
    # but may be used on client side
    # in general any information can be stored
    # along the actual samples
    meta_info['sampling_rate'] = len(accs)/duration
    # <<< end of code specific for repo

    try:
        # see mvg for details on this call
        session.create_measurement(sid=SOURCE_ID,
                                   duration=duration,
                                   timestamp=int(m),
                                   data={"acc": accs},
                                   meta=meta_info)
    except MVGAPIError as exc:
        print(exc)
Uploading measurements to source u0001

Read the Measurements

Check if we actually created the measurements by reading them.

[14]:
m = session.list_measurements(SOURCE_ID)
print(f"Read {len(m)} stored measurements")
Read 50 stored measurements

It is also possible to read a specific measurement at a single timestamp. Let’s get the timestamp of the first measurement.

[15]:
ts_0 = m[0]['timestamp']
meas_0 = session.read_single_measurement(SOURCE_ID, ts_0)
# we'll printout the returned measurement
print(f"source_id: {SOURCE_ID}")
print(f"timestamp: {ts_0}")
print(f"duration:  {meas_0['duration']}")
print(f"meta:      {meas_0['meta']}")
print(f"data:      {meas_0['data']['acc'][1:3]}...")
source_id: u0001
timestamp: 1570186860
duration:  2.8672073400507907
meta:      {'sampling_rate': 13950.857142857141}
data:      [0.63897705078125, -0.55078125]...

We can update the meta information for a measurement at a single timestamp. Let’s get the timestamp of the first measurement.

[16]:
ts_0 = m[0]['timestamp']
new_meta = meas_0['meta']
new_meta['updated'] = "YES!"
session.update_measurement(SOURCE_ID, ts_0, new_meta)
meas_0_u = session.read_single_measurement(SOURCE_ID, ts_0)

# we'll printout the returned measurement
print(f"source_id: {SOURCE_ID}")
print(f"timestamp: {ts_0}")
print(f"duration:  {meas_0_u['duration']}")
print(f"meta:      {meas_0_u['meta']}")
print(f"data:      {meas_0_u['data']['acc'][1:3]}...")
source_id: u0001
timestamp: 1570186860
duration:  2.8672073400507907
meta:      {'sampling_rate': 13950.857142857141, 'updated': 'YES!'}
data:      [0.63897705078125, -0.55078125]...

Finally, we delete the measurement.

[18]:
session.delete_measurement(SOURCE_ID, ts_0)

We check if it’s actually deleted

[17]:
try:
    meas_0 = session.read_single_measurement(SOURCE_ID, ts_0)
except MVGAPIError:
    print("Previously deleted measurement does not exist")

If all went well we end up here now. The source u0001 is in the database along with a number of its measurements.

Uploading additional sources

Here we show how to upload additional sources in a more compact way.

[19]:
sources = SOURCE_IDS[1:]

print(f"Uploading sources {', '.join(sources)} from CharlieDb")
for source_id in tqdm(sources):

    try:
        session.delete_source(source_id)
    except MVGAPIError as exc:
        pass  # Source didnt exist

    print(f"Creating source {source_id}")
    src_path = REF_DB_PATH / source_id
    m_file_name = REF_DB_PATH / source_id / "meta.json"
    with open(m_file_name, "r") as json_file:
        meta = json.load(json_file)
    session.create_source(source_id, channels=["acc"], meta=meta)  # create

    meas = [f.stem for f in Path(src_path).glob("*.csv")]

    print(f"Uploading measurements to source {source_id}")
    for m in tqdm(meas):

        # samples file for one measurement
        TS_MEAS = str(m) + ".csv"  # filename
        TS_MEAS = REF_DB_PATH / source_id / TS_MEAS  # path to file
        ts_df = pd.read_csv(TS_MEAS)  # read csv into df
        accs = ts_df.iloc[:, 0].tolist()  # convert to list

        # meta information file for one measurement
        TS_META = str(m) + ".json"  # filename
        TS_META = REF_DB_PATH / source_id / TS_META  # path
        with open(TS_META, "r") as json_file:  # read json
            meas_info = json.load(json_file)  # into dict

        duration = meas_info['duration']
        meta_info = meas_info['meta']
        meta_info['sampling_rate'] = len(accs)/duration

        try:
            # see mvg for details on this call
            session.create_measurement(sid=source_id,
                                       duration=duration,
                                       timestamp=int(m),
                                       data={"acc": accs},
                                       meta=meta_info)
        except MVGAPIError as exc:
            print(exc)
            raise



Uploading sources u0002, u0003, u0004, u0005, u0006 from CharlieDb
Creating source u0002
Uploading measurements to source u0002
Creating source u0003
Uploading measurements to source u0003
Creating source u0004
Uploading measurements to source u0004
Creating source u0005
Uploading measurements to source u0005
Creating source u0006
Uploading measurements to source u0006