Skip to content

Develop a Web Connector

This example is for developing a web connector using Python and FastAPI.

You'll implement a simple datasource adapter for the OpenWeather APIs.

The code is given intentionally simple and straightforward in order to focus on the base concepts, not on the implementation.

Requirements

Base concepts

A Web Connector is a data adapter that exposes your data to iGenius Crystal backend.
It's a web service and you only need to implement three endpoints to be able to talk with Crystal:

  • /test_connection: used by iGenius backend services to check the connection between them and the Web Connector, in order to know if the connector is available. In this endpoint you can also test the connection between your Web Connectior and your custom data source. In this way, you can tell to iGenius backend services if your data source is ready to be queried
  • /collections/actions/describe: returns to iGenius backend the list of available collections in the connected datasource. A collection is a structured set of data such as a database table or CSV file
  • /query/actions/execute: returns the result of the query requested by Crystal

Project bootstrap

First of all, you need to bootstrap your project.
The first step is to initialize our project (here we use Poetry)

$ poetry new webconnector

We suggest to move the webconnector directory into a src directory. So, you'll en up with a structure like this:

|- src
    |- webconnector
        |- __init__.py
|- tests
    |- __init__.py
    |- test_webconnector.py
|- poetry.lock
|- pyproject.toml
|- README.rst

Next you can install FastAPI and Uvicorn (we'll need it later to run our webservice) with

$ poetry add fastapi uvicorn

and iGenius Adapters SDK with

$ poetry add igenius-adapters-sdk

and, finally, you can install your project itself

$ poetry install

Endpoint /test_connection

Now you can start to implement our first endpoint: /test_connection. You'll keep it simple, so you'll always return a successful outcome.

Let's start writing the route:

src/webconnector/routers/test_connection.py

import logging
from typing import Union

from fastapi import APIRouter
from pydantic import BaseModel

logger = logging.getLogger("webconnector")
logger.setLevel("INFO")

router = APIRouter()


class CheckConnectionResponse(BaseModel):
    outcome: str


class CheckConnectionFailResponse(CheckConnectionResponse):
    details: str


@router.get(
    "/test_connection",
    responses={
        200: {
            "description": (
                "Successful response, the Web Connector is ready to accept requests."
            )
        }
    },
    response_model=Union[CheckConnectionFailResponse, CheckConnectionResponse],
)
def test_connection():
    logger.info("Checking connection...")

    return {"outcome": "success"}

Now you can define the main FastAPI application.

src/webconnector/app.py

from fastapi import FastAPI

from webconnector.routers import test_connection

app = FastAPI(title="Crystal WebConnector Datasource Adapter")

app.include_router(test_connection.router, tags=["crystal endpoints"])

And the entrypoint of our webservice.

src/main.py

import os

import uvicorn

if __name__ == '__main__':
    host = os.environ.get('WEB_HOST', '0.0.0.0')
    port = int(os.environ.get('WEB_PORT', 8090))
    uvicorn.run(
        'webconnector.app:app',
        host=host,
        port=port,
        log_level='info',
        reload=True
    )

Run /test_connection endpoint

If everything went fine, you can execute your webservice with:

$ poetry run python src/main.py
INFO:     Uvicorn running on http://0.0.0.0:8090 (Press CTRL+C to quit)
INFO:     Started reloader process [44110] using statreload
INFO:     Started server process [44114]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

And you can access your first endpoint at http://127.0.0.1:8090/test_connection with your browser. Here is a sample output with curl:

$ curl -i -X GET http://127.0.0.1:8090/test_connection
HTTP/1.1 200 OK
date: Tue, 05 Jan 2021 11:50:28 GMT
server: uvicorn
content-length: 21
content-type: application/json

{"outcome":"success"}

Endpoint /collections/actions/describe

You'll keep this project very simple, so the describe collections endpoint will expose only a collection.

The schema

Based on OpenWeather APIs, you'll map a Collection named Cities with these attributes:

name type filterable sortable
id categorical true true
name categorical true true
country categorical true true
temp_min numeric true true
temp_max numeric true true

Mapping the schema using iGenius Adapters SDK entities

To describe a collection, you need to use the entities by our iGenius Adapters SDK. You should start from Attribute that will be aggregated into AttributesSchema that will be collected into a Collection.

tc_id_attr = Attribute(
    uid="id",
    type=AttributeType.CATEGORICAL,
    filterable=True,
    sortable=True,
)
tc_name_attr = Attribute(
    uid="name",
    type=AttributeType.CATEGORICAL,
    filterable=True,
    sortable=True,
)
tc_country_attr = Attribute(
    uid="country",
    type=AttributeType.CATEGORICAL,
    filterable=True,
    sortable=True,
)
tc_min_temp_attr = Attribute(
    uid="temp_min",
    type=AttributeType.NUMERIC,
    filterable=True,
    sortable=True,
)
tc_max_temp_attr = Attribute(
    uid="temp_max",
    type=AttributeType.NUMERIC,
    filterable=True,
    sortable=True,
)
cities_schema = AttributesSchema(
    attributes=[
        tc_id_attr,
        tc_name_attr,
        tc_country_attr,
        tc_min_temp_attr,
        tc_max_temp_attr,
    ]
)
top_cities = Collection(
    uid="openweather.cities", attributes_schema=cities_schema
)

The route

The resulting route will be:

src/webconnector/routers/describe_collections.py

import logging
from typing import List

from fastapi import APIRouter
from pydantic import BaseModel
from igenius_adapters_sdk.entities.collection import (
    Collection,
    AttributesSchema,
    Attribute,
    AttributeType,
)

logger = logging.getLogger("webconnector")
logger.setLevel("INFO")

router = APIRouter()


class DescribeCollectionsResponse(BaseModel):
    collections: List[Collection]


class DescribeCollectionsError(Exception):
    pass


@router.post(
    "/collections/actions/describe",
    responses={200: {"description": "Successful response"}},
    response_model=DescribeCollectionsResponse,
)
def describe_collections():
    tc_id_attr = Attribute(
        uid="id",
        type=AttributeType.CATEGORICAL,
        filterable=True,
        sortable=True,
    )
    tc_name_attr = Attribute(
        uid="name",
        type=AttributeType.CATEGORICAL,
        filterable=True,
        sortable=True,
    )
    tc_country_attr = Attribute(
        uid="country",
        type=AttributeType.CATEGORICAL,
        filterable=True,
        sortable=True,
    )
    tc_min_temp_attr = Attribute(
        uid="temp_min",
        type=AttributeType.NUMERIC,
        filterable=True,
        sortable=True,
    )
    tc_max_temp_attr = Attribute(
        uid="temp_max",
        type=AttributeType.NUMERIC,
        filterable=True,
        sortable=True,
    )
    cities_schema = AttributesSchema(
        attributes=[
            tc_id_attr,
            tc_name_attr,
            tc_country_attr,
            tc_min_temp_attr,
            tc_max_temp_attr,
        ]
    )
    top_cities = Collection(
        uid="openweather.cities", attributes_schema=cities_schema
    )
    collections = [top_cities]
    return {
        "collections": collections,
    }

And the router inclusion in app.py.

src/webconnector/app.py

...

app.include_router(test_connection.router, tags=["crystal endpoints"])
app.include_router(describe_collections.router, tags=["crystal endpoints"])

Run /collections/actions/describe endpoint

If everything went fine, you can access your first endpoint at http://127.0.0.1:8090/collections/actions/describe. Here you have an HTTP POST, so we suggest to use a REST client or simply curl with jq:

$ curl -s -X POST http://127.0.0.1:8091/collections/actions/describe | jq
{
  "collections": [
    {
      "uid": "accuweather.top_cities",
      "attributes_schema": {
        "attributes": [
          {
            "uid": "key",
            "type": "crystal.topics.data.attribute-types.categorical",
            "filterable": true,
            "sortable": true
          },
          {
            "uid": "english_name",
            "type": "crystal.topics.data.attribute-types.categorical",
            "filterable": true,
            "sortable": true
          },
          {
            "uid": "country",
            "type": "crystal.topics.data.attribute-types.categorical",
            "filterable": true,
            "sortable": true
          },
          {
            "uid": "temp_min",
            "type": "crystal.topics.data.attribute-types.numeric",
            "filterable": true,
            "sortable": true
          },
          {
            "uid": "temp_max",
            "type": "crystal.topics.data.attribute-types.numeric",
            "filterable": true,
            "sortable": true
          }
        ]
      }
    }
  ]
}

Endpoint /query/actions/execute

Here you'll have a good part of our business logic, since it is the endpoint that will translate the query coming from Crystal to a result set.
In this example, you'll work on a simple request: a list of cities with current min and max temperature in ascending order.

The payload

Let's start with the payload that will be translated into a Query entity.

{
    "query": {
        "from_": {
            "datasource_uid": "2b55dd12-9e9b-4bc1-967e-be497abdfc1f",
            "collection_uid": "openweather.cities"
        },
        "where": null,
        "order_by": [
            {
                "alias": "temp_min",
                "direction": "asc"
            },
            {
                "alias": "temp_max",
                "direction": "asc"
            }
        ],
        "limit": 1000,
        "offset": 0,
        "aggregations": [
            {
                "attribute_uri": {
                    "datasource_uid": "2b55dd12-9e9b-4bc1-967e-be497abdfc1f",
                    "collection_uid": "openweather.cities",
                    "attribute_uid": "id"
                },
                "alias": "id",
                "function_uri": {
                    "function_type": "aggregation",
                    "function_uid": "crystal.topics.data.aggregation.identity",
                    "function_params": null
                }
            },
            {
                "attribute_uri": {
                    "datasource_uid": "2b55dd12-9e9b-4bc1-967e-be497abdfc1f",
                    "collection_uid": "openweather.cities",
                    "attribute_uid": "name"
                },
                "alias": "name",
                "function_uri": {
                    "function_type": "aggregation",
                    "function_uid": "crystal.topics.data.aggregation.identity",
                    "function_params": null
                }
            },
            {
                "attribute_uri": {
                    "datasource_uid": "2b55dd12-9e9b-4bc1-967e-be497abdfc1f",
                    "collection_uid": "openweather.cities",
                    "attribute_uid": "country"
                },
                "alias": "country",
                "function_uri": {
                    "function_type": "aggregation",
                    "function_uid": "crystal.topics.data.aggregation.identity",
                    "function_params": null
                }
            },
            {
                "attribute_uri": {
                    "datasource_uid": "2b55dd12-9e9b-4bc1-967e-be497abdfc1f",
                    "collection_uid": "openweather.cities",
                    "attribute_uid": "temp_min"
                },
                "alias": "temp_min",
                "function_uri": {
                    "function_type": "aggregation",
                    "function_uid": "crystal.topics.data.aggregation.identity",
                    "function_params": null
                }
            },
            {
                "attribute_uri": {
                    "datasource_uid": "2b55dd12-9e9b-4bc1-967e-be497abdfc1f",
                    "collection_uid": "openweather.cities",
                    "attribute_uid": "temp_max"
                },
                "alias": "temp_max",
                "function_uri": {
                    "function_type": "aggregation",
                    "function_uid": "crystal.topics.data.aggregation.identity",
                    "function_params": null
                }
            }
        ]
    }
}

Payload analysis

In this example query, you are requesting all the fields of our table.

crystal sends the authentication key that you have configured on the crystal Self-Service Console as a request header:

{"X-API-Key": 123}

There is one main key at the root of our payload:

  • query: it's the paylod that will be translated automatically to a Query entity by Pydantic

In this payload you can notice the key datasource_uid: it's the UID of the datasource linked to your Web Connector and it's assigned automatically by Crystal. You don't have to parse it.

In this payload you can see the attributes defined as AggregationAttribute. You can play with this example trying adding or removing attributes.
For the purpose of this example, we have only a Collection, so in our code we'll parse only the attribute_uid. Since in our adapter we are also not handling aliases, we'll ignore the alias key.
On the function_uri key you can say that, since here you only have single values and not any particular aggregation, you are handling identities. In Crystal jargon, an identity is a single value.

{
    "attribute_uri": {
        "datasource_uid": "2b55dd12-9e9b-4bc1-967e-be497abdfc1f",
        "collection_uid": "openweather.cities",
        "attribute_uid": "name"
    },
    "alias": "name",
    "function_uri": {
        "function_type": "aggregation",
        "function_uid": "crystal.topics.data.aggregation.identity",
        "function_params": null
    }
}

Additionally, in the payload you can notice that we are requesting min_temp and max_temp values to be displayed in ascending order through a list of OrderByAttribute. Considering that OpenWeather APIs do not cover this functionality, we will have to implement this operation so that it is performed on the final result.

So, to handle correctly a request with this payload, you should implement something like this for your endpoint:

APP_ID should be your own OpenWeather API key, which you can generate with an OpeanWeather free account. The content of city.list.json corresponds to city.list.json.gz after decompression, which can be downloaded from OpenWeather bulk files.

src/webconnector/routers/execute_query.py

import http.client
import json
from typing import Any, Dict, List

from fastapi import APIRouter
from igenius_adapters_sdk.entities.query import Query
from pydantic import BaseModel


APP_ID = "XXXXXX"

router = APIRouter()


class ConnectionParamsRequest(BaseModel):
    token: str


class ExecuteQueryError(Exception):
    pass


class ExecuteQueryResponse(BaseModel):
    records: List[Dict[str, Any]]


@router.post(
    "/query/actions/execute",
    responses={200: {"description": "Successful response"}},
    response_model=ExecuteQueryResponse,
)
def execute_query(
    query: Query,
    x_api_key: str = Header(None),
):
    fields = [
        aggregation.attribute_uri.attribute_uid for aggregation in query.aggregations
    ]

    with open("./assets/city.list.json") as json_file:
        data = [
            {"id": record["id"], "name": record["name"], "country": record["country"]}
            for record in json.load(json_file)[:10]
        ]

    records = []
    for city in data:
        conn = http.client.HTTPSConnection("api.openweathermap.org")
        conn.request("GET", f"/data/2.5/weather?id={city['id']}&appid={APP_ID}", "", {})
        res = conn.getresponse()
        response_data = res.read()
        response = json.loads(response_data.decode("utf-8"))

        complete_record = {
            "id": city["id"],
            "name": city["name"],
            "country": city["country"],
            "temp_min": response["main"]["temp_min"],
            "temp_max": response["main"]["temp_max"],
        }

        record = {key: complete_record[key] for key in fields}
        records.append(record)

    if query.order_by:
        for sorting_option in query.order_by:
            records = sorted(
                records,
                key=lambda k: k[sorting_option.alias],
                reverse=True if sorting_option.direction == "desc" else False,
            )

    return {"records": records}

Now we can add our route to the application:

src/webconnector/app.py

...

app.include_router(test_connection.router, tags=["crystal endpoints"])
app.include_router(describe_collections.router, tags=["crystal endpoints"])
app.include_router(execute_query.router, tags=["crystal endpoints"])

Run /query/actions/execute endpoint

Pay attention to --data-raw: to avoid a verbose command, there's an ellipsis. You must use the complete data structure previously shown in this documentation

$ curl --location --request POST 'http://localhost:8091/query/actions/execute' \
--header 'X-API-Key: 123' \
--header 'Content-Type: application/json' \
--data-raw '{
    "from_": {
        "datasource_uid": "2b55dd12-9e9b-4bc1-967e-be497abdfc1f",
        "collection_uid": "openweather.cities"
    },
    "where": null,
    "order_by": [
        {
            "alias": "temp_min",
            "direction": "asc"
        },
        {
            "alias": "temp_max",
            "direction": "asc"
        }
    ],
    "limit": 1000,
    "offset": 0,
    "aggregations": [
        ...
    ]
}'

Last update: April 30, 2021