Build awareness and adoption for your software startup with Circuit.

From Data Science to Production: Building a Model Service Using Tornado

A easy way to turn a model into a web service in Python.

Background

In the industry, a data science project should extend beyond an offline machine learning model confined to a Jupyter notebook. AI engineers must deploy their models to ensure user accessibility. The prevalent approach for deployment involves creating a standard web service interfaceā€”an Application Programming Interface (API). This API comprises a set of URLs that facilitate model predictions using fresh input data. By adopting this strategy, your model operates as an independent service, seamlessly integrating into complex software or applications.

Deploy an AI model as a web service. Source: by authorPython libraries such as Flask, Tornado, and Django provide web frameworks that make it easy to develop web services. This article introduces the basic guide of Tornado and shows a model service in a real-world project.

Solution for building a service

A minimum case

AĀ TornadoĀ web application consists of three parts:Ā tornado.web.RequestHandlerĀ objects that execute your backend code to respond to web requests,Ā tornado.web.ApplicationĀ object that routes requests to corresponding handlers, and aĀ mainĀ function that runs the server. Methods of RequestHandler correspond to the common methods of HTTP, i.e., GET, POST, PUT, DELETE, and so on.

Here is an example of aĀ main.pyĀ script that creates a service API to return a ā€œHello, worldā€ string. The web service will start and wait for incoming web requests once you run the script. To call the API, send a GET request to the server with URLĀ 127.0.0.1:8888/main, which joins the localhost addressĀ 127.0.0.1, the service portĀ 8888, and the routing pathĀ main. TheĀ asyncioĀ library enables new requests to proceed if existing connections are idle by executing functions in an asynchronous and non-blocking way. You can learn more about asynchronous programming by visiting the website ofĀ asyncio.

# @File: main.py

import asyncio
import tornado


class MainHandler(tornado.web.RequestHandler):

    async def get(self):
        self.write("Hello, world")


class Application(tornado.web.Application):

    _routes = [
        tornado.web.url(r"/main", MainHandler),  # handle the request "<address>:<port>/main"
    ]

    def __init__(self):
        super(Application, self).__init__(self._routes)


async def main():
    app = Application()
    app.listen(8888)
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(main())

Get request parameters

Web requests usually have parameters in the query or in the URL path from the client side. For example, the request URL 127.0.0.1:8888/query_param/a=xxx&b=yyy means you are handling a request that routes to 127.0.0.1:8888/query_param with parameters a=xxx and b=yyy in the query. Similarly, the request URL 127.0.0.1:8888/path_param/xxx/yyy means you are handling a request that routes to 127.0.0.1:8888/path_param/xxx/yyy where xxx and yyy are parameters in the URL path. In the example below, we rewrite the Handler and the Application to demonstrate how to get parameter values. For parameters in the query, you can access self.request.arguments in the get method of ParamInQueryHandler, For parameters in the URL path, you can access parameters by accessing the function arguments in the get method of ParamInPathHandler.

class ParamInQueryHandler(tornado.web.RequestHandler):

    async def get(self):
        # get query parameters and decode bytes to string
        query = self.request.arguments
        for key, value in query.items():
            query[key] = str(value[0].decode('utf-8'))
        self.write(query)


class ParamInPathHandler(tornado.web.RequestHandler):

    async def get(self, a, b):
        self.write(f"Params: {a}, {b}")


class Application(tornado.web.Application):

    _routes = [
        tornado.web.url(r"/query_param", ParamInQueryHandler),
        tornado.web.url(r"/path_param/(\w+)/(\w+)", ParamInPathHandler)
    ]

    def __init__(self):
        super(Application, self).__init__(self._routes)

Multi-threading

To avoid I/O functions for several concurrent requests to block each other in one thread, the asyncio library can be used. To further increase the service capacity to handle concurrent requests, multi-threading can be used to take advantages of multiple CPU cores. In Tornado, you can create a ThreadPoolExecutor within the handler and then decorate your functions with run_on_executor. This way, the functions that block each other for different requests can run parallel. The code below demonstrates how to rewrite the ParamInPathHandler to support multi-threading.

from tornado.concurrent import run_on_executor
import concurrent.futures
import time

class ParamInPathHandler(tornado.web.RequestHandler):

    executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    @run_on_executor
    def blocking_task(self):
        # This function will be executed in a thread from the executor pool.
        time.sleep(1)
        return 1

    async def get(self, a, b):
        # This function will be executed in the main thread.
        result = await self.blocking_task()
        self.write(f"Params: {a}, {b}")

Example

Here is a real-world project that uses deep learning models to forecast streamflow for river gauge stations. The project aims to provide a web service that predicts the daily streamflow for the next few days. The main script to start the server is presented below.

Web service code

In our project, weā€™ve developed two essential APIs: theĀ ā€œinfoā€ APIĀ and theĀ ā€œforecastā€ API. These APIs serve distinct purposes:

  1. Info API: This API provides information about all river sites where our forecasting model can be applied.
  2. Forecast API: Here, we retrieve forecasted streamflow data for a specific river site. The API leverages a deep learning time-series forecasting model, which predicts future streamflow values based on online weather forecast data.

To implement these APIs effectively, weā€™ve defined three handlers (see the code block below):

  1. InfoHandler: Responsible for querying site information from our local database using theĀ InfoService object.
  2. ForecastHandler: Utilizes theĀ ForecastServiceĀ object to feed data into the forecasting model and generate accurate predictions.
  3. HealthHandler: Ensures the serviceā€™s connectivity by validating connections.

All three handlers share common functionality. They include two generic functions:

  1. Setting Default Headers: This function ensures cross-origin requests are allowed, enabling seamless access to HTTP resources via HTTPS.
  2. Query Parameter Parsing and Execution: This function parses query parameters and executes our data science code.

To streamline our codebase, weā€™ve introduced a parent class calledĀ BaseHandler. This class implements the genericĀ set_default_headersĀ andĀ _process_getĀ methods, which are inherited by the specialized handlers.

# @File: main_service.py

from service.info_service import InfoService
from service.forecast_service import ForecastService
from config.config_service import ServiceConfig
import asyncio
import tornado
from tornado.concurrent import run_on_executor
import concurrent


class BaseHandler(tornado.web.RequestHandler):

    executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

    def set_default_headers(self):
        ......

    @run_on_executor
    def _process_get(self, service):
        query = self.request.arguments
        for key, value in query.items():
            query[key] = str(value[0].decode('utf-8'))
        print(query)
        response = service.execute(query)
        return response


class HealthHandler(BaseHandler):

    async def get(self):
        self.write("OK")


class InfoHandler(BaseHandler):

    async def get(self):
        service = InfoService()
        response = await self._process_get(service)
        self.write(response)


class ForecastHandler(BaseHandler):

    async def get(self):
        service = ForecastService()
        response = await self._process_get(service)
        self.write(response)


class Application(tornado.web.Application):

    _routes = [
        tornado.web.url(r"/healthCheck", HealthHandler),
        tornado.web.url(r"/info", InfoHandler),
        tornado.web.url(r"/forecast", ForecastHandler)
    ]

    def __init__(self):
        super(Application, self).__init__(self._routes)


async def main():
    app = Application()
    app.listen(ServiceConfig.port)
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(main())

Keep in mind that the detailed implementation ofĀ InfoServiceĀ andĀ ForecastServiceĀ lies beyond the scope of this article.

Access the web service

When deploying our service on a local PC, we can conveniently access the APIs using the addressĀ 127.0.0.1. Hereā€™s how we interact with the two APIs:

  • Info API:

To retrieve information about river sites, we utilize Python to call theĀ ā€œinfoā€ API. The API responds with a Python dictionary containing result data. Below is an example code snippet demonstrating how to call the ā€œinfoā€ API:

import requests

url = '127.0.0.1'
port = 8888
endpoint = 'info'
query = {}
response = requests.get(f'http://{url}:{port}/{endpoint}', params=query)
print(response.json())
# {
#     'success': True,
#     'message': 'Success.',
#     'data': {'site_info': [
#         {'id': '10251335', 'latitude': 35.80094444, 'longitude': -116.1944167, 'area': 34.5, 'elevation': 1236.59},
#         {'id': '10258500', 'latitude': 33.74502178, 'longitude': -116.5355709, 'area': 93.1, 'elevation': 700.0},
#         ......
#     ]}
# }
  • Forecast API:

For forecasting results of a specific river site, we invoke theĀ ā€œforecastā€ API. This API requires two query parameters:Ā site_idĀ andĀ forecast_days. Below is an example code snippet demonstrating how to call the ā€œforecastā€ API:

import requests

url = '127.0.0.1'
port = 8888
endpoint = 'forecast'
query = {'site_id': '10251335', 'forecast_days': 5}
response = requests.get(f'http://{url}:{port}/{endpoint}', params=query)
print(response.json())
# {
#     'success': True,
#     'message': 'Success.',
#     'data': {
#         'site_id': '10251335',
#         'forecast_days': 5,
#         'forecast': [
#             {'time': '2024-01-13', 'flow': 0.35031596854725167},
#             {'time': '2024-01-14', 'flow': 0.35143999406036575},
#             {'time': '2024-01-15', 'flow': 0.34945296611783816},
#             {'time': '2024-01-16', 'flow': 0.34787518902467607},
#             {'time': '2024-01-17', 'flow': 0.35213189176247556}
#         ]
#     }
# }

The output provides forecasted streamflow data for the specified site over the next five days. Remember to adapt the URLs and parameters according to your specific deployment environment.

Summary

In this article, we dive into the fundamental aspects of theĀ Tornado Python library. Weā€™ll cover the following key topics:

  1. Tornado Web Application Structure: Understand the organization of a Tornado web app, including its code structure.
  2. Request Parameter Handling: Learn how to efficiently access request parameters from clients.
  3. Multi-Threading: Explore techniques to enable multi-threading in Tornado applications.

Additionally, weā€™ll illustrate the typical Tornado framework code for aĀ data science projectĀ using a real-world example. Starting with the simple examples provided here will lay a solid foundation for mastering the intricacies of deploying AI models via complex web services in the industry.

Links




Continue Learning