Home > Backend Development > Python Tutorial > Python's Tornado framework's asynchronous tasks with AsyncHTTPClient

Python's Tornado framework's asynchronous tasks with AsyncHTTPClient

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
Release: 2016-07-06 13:29:48
Original
2895 people have browsed it

High performance server Tornado
Python’s web frameworks have many names, each with its own merits. Just as glory belongs to Greece, so greatness belongs to Rome. The elegance of Python combined with the design of WSGI makes the web framework interface unified for thousands of years. WSGI combines application and server. Both Django and Flask can be combined with gunicon to build and deploy applications.

Unlike django and flask, tornado can be either a wsgi application or a wsgi service. Of course, more considerations for choosing tornado stem from its single-process, single-thread asynchronous IO network mode. High performance is often attractive, but many friends will ask questions after using it. Tornado claims to be high performance, but why can’t it be felt when actually using it?

In fact, the high performance comes from Tornado’s asynchronous network IO based on Epoll (kqueue for unix). Because of Tornado's single-thread mechanism, it is easy to write code that blocks services (blocks) accidentally. Not only does it not improve performance, it actually causes a sharp drop in performance. Therefore, it is necessary to explore the asynchronous use of tornado.

How to use Tornado asynchronously
In short, Tornado's asynchronous includes two aspects, the asynchronous server and the asynchronous client. Regardless of the server or the client, the specific asynchronous model can be divided into callbacks and coroutines. There are no clear boundaries for specific application scenarios. Often a request service also contains client asynchronous requests for other services.

Server-side asynchronous mode
The server is asynchronous, which can be understood as a time-consuming task that needs to be done within a tornado request. Writing it directly in the business logic may block the entire service. Therefore, this task can be processed asynchronously. There are two ways to achieve asynchronous processing, one is the yield suspension function, and the other is to use a thread pool-like method. Please see a synchronization example:

class SyncHandler(tornado.web.RequestHandler):

  def get(self, *args, **kwargs):
    # 耗时的代码
    os.system("ping -c 2 www.google.com")
    self.finish('It works')

Copy after login

Test it using ab:

ab -c 5 -n 5 http://127.0.0.1:5000/sync

Copy after login

Server Software:    TornadoServer/4.3
Server Hostname:    127.0.0.1
Server Port:      5000

Document Path:     /sync
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  5.076 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  0.99 [#/sec] (mean)
Time per request:    5076.015 [ms] (mean)
Time per request:    1015.203 [ms] (mean, across all concurrent requests)
Transfer rate:     0.19 [Kbytes/sec] received

Copy after login

The qps is only a pitiful 0.99, let’s treat it as processing one request per second.

The following is the asynchronous method:

class AsyncHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    tornado.ioloop.IOLoop.instance().add_timeout(1, callback=functools.partial(self.ping, 'www.google.com'))

    # do something others

    self.finish('It works')

  @tornado.gen.coroutine
  def ping(self, url):
    os.system("ping -c 2 {}".format(url))
    return 'after'

Copy after login

Although timeout 1 second is selected when executing asynchronous tasks, the return of the main thread is still very fast. ab pressure test is as follows:

Document Path:     /async
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.009 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  556.92 [#/sec] (mean)
Time per request:    8.978 [ms] (mean)
Time per request:    1.796 [ms] (mean, across all concurrent requests)
Transfer rate:     107.14 [Kbytes/sec] received

Copy after login

The above usage method can put time-consuming tasks into the background for asynchronous calculation through tornado's IO loop, and the request can continue to do other calculations. However, there are often times when we need the results of calculations after completing some time-consuming tasks. This method no longer works at this time. There must be a road before the road, and you only need to switch to an asynchronous mode. The following is rewritten using coroutines:

class AsyncTaskHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):
    # yield 结果
    response = yield tornado.gen.Task(self.ping, ' www.google.com')
    print 'response', response
    self.finish('hello')

  @tornado.gen.coroutine
  def ping(self, url):
    os.system("ping -c 2 {}".format(url))
    return 'after'

Copy after login

You can see that the asynchronous process is being processed, and the result value is also returned.

Server Software:    TornadoServer/4.3
Server Hostname:    127.0.0.1
Server Port:      5000

Document Path:     /async/task
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.049 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  101.39 [#/sec] (mean)
Time per request:    49.314 [ms] (mean)
Time per request:    9.863 [ms] (mean, across all concurrent requests)
Transfer rate:     19.51 [Kbytes/sec] received

Copy after login

The qps improvement is still obvious. Sometimes this coroutine processing may not be faster than synchronization. When the amount of concurrency is small, the gap opened by IO itself is not large. Even the performance of coroutines and synchronization are similar. For example, if you run 100 meters with Bolt, you will definitely lose to him, but if you run 2 meters with him, it is still undecided who will win.

Yield suspends the function coroutine. Although there is no block main thread, because the return value needs to be processed, there is still time to wait until the response is executed, compared to a single request. Another way to use asynchronous and coroutines is to use a thread pool outside the main thread. The thread pool depends on futures. Python2 requires additional installation.

The following method of using the thread pool is modified to asynchronous processing:

from concurrent.futures import ThreadPoolExecutor

class FutureHandler(tornado.web.RequestHandler):
  executor = ThreadPoolExecutor(10)

  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    url = 'www.google.com'
    tornado.ioloop.IOLoop.instance().add_callback(functools.partial(self.ping, url))
    self.finish('It works')

  @tornado.concurrent.run_on_executor
  def ping(self, url):
    os.system("ping -c 2 {}".format(url))

Copy after login

Run the ab test again:

Document Path:     /future
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.003 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   995 bytes
HTML transferred:    25 bytes
Requests per second:  1912.78 [#/sec] (mean)
Time per request:    2.614 [ms] (mean)
Time per request:    0.523 [ms] (mean, across all concurrent requests)
Transfer rate:     371.72 [Kbytes/sec] received

Copy after login

qps instantly reached 1912.78. At the same time, you can see that the server's log is still outputting ping results.
It's also easy to return a value. Then switch the usage interface. Use the with_timeout function under tornado's gen module (this function must be in tornado>3.2 version).

class Executor(ThreadPoolExecutor):
  _instance = None

  def __new__(cls, *args, **kwargs):
    if not getattr(cls, '_instance', None):
      cls._instance = ThreadPoolExecutor(max_workers=10)
    return cls._instance


class FutureResponseHandler(tornado.web.RequestHandler):
  executor = Executor()

  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    future = Executor().submit(self.ping, 'www.google.com')

    response = yield tornado.gen.with_timeout(datetime.timedelta(10), future,
                         quiet_exceptions=tornado.gen.TimeoutError)

    if response:
      print 'response', response.result()

  @tornado.concurrent.run_on_executor
  def ping(self, url):
    os.system("ping -c 1 {}".format(url))
    return 'after'

Copy after login

The thread pool method can also implement coroutine processing by using tornado's yield to suspend the function. The results of time-consuming tasks can be obtained without blocking the main thread.

Concurrency Level:   5
Time taken for tests:  0.043 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   960 bytes
HTML transferred:    0 bytes
Requests per second:  116.38 [#/sec] (mean)
Time per request:    42.961 [ms] (mean)
Time per request:    8.592 [ms] (mean, across all concurrent requests)
Transfer rate:     21.82 [Kbytes/sec] received
Copy after login

The qps is 116, and using the yield coroutine is only about one-tenth of that of non-reponse. It seems that there is a lot of performance loss. The main reason is that the coroutine needs to wait for the completion of the task before returning the result.

It’s like fishing. The former method is to cast the net and then finish it, indifferent, and the time is of course fast. The latter method is to cast the net and then have to close it, and it will take a while to wait for the net to be closed. Of course, it is still hundreds of times faster than the synchronized method. After all, casting a net is still faster than fishing one by one.

The specific method to use depends more on the business. Those that do not require return values ​​often need to handle callbacks. Too many callbacks can easily cause confusion. Of course, if many callbacks are needed to be nested, the first thing to optimize should be the business or product logic. The yield method is very elegant, and the writing method can be written asynchronously and logically synchronously, which is great, but of course it will also lose a certain amount of performance.

异步多样化
Tornado异步服务的处理大抵如此。现在异步处理的框架和库也很多,借助redis或者celery等,也可以把tonrado中一些业务异步化,放到后台执行。

此外,Tornado还有客户端异步功能。该特性主要是在于 AsyncHTTPClient的使用。此时的应用场景往往是tornado服务内,需要针对另外的IO进行请求和处理。顺便提及,上述的例子中,调用ping其实也算是一种服务内的IO处理。接下来,将会探索一下AsyncHTTPClient的使用,尤其是使用AsyncHTTPClient上传文件与转发请求。

异步客户端
前面了解Tornado的异步任务的常用做法,姑且归结为异步服务。通常在我们的服务内,还需要异步的请求第三方服务。针对HTTP请求,Python的库Requests是最好用的库,没有之一。官网宣称:HTTP for Human。然而,在tornado中直接使用requests将会是一场恶梦。requests的请求会block整个服务进程。

上帝关上门的时候,往往回打开一扇窗。Tornado提供了一个基于框架本身的异步HTTP客户端(当然也有同步的客户端)--- AsyncHTTPClient。

AsyncHTTPClient 基本用法
AsyncHTTPClient是 tornado.httpclinet 提供的一个异步http客户端。使用也比较简单。与服务进程一样,AsyncHTTPClient也可以callback和yield两种使用方式。前者不会返回结果,后者则会返回response。

如果请求第三方服务是同步方式,同样会杀死性能。

class SyncHandler(tornado.web.RequestHandler):
  def get(self, *args, **kwargs):

    url = 'https://api.github.com/'
    resp = requests.get(url)
    print resp.status_code

    self.finish('It works')

Copy after login

使用ab测试大概如下:

Document Path:     /sync
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  10.255 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  0.49 [#/sec] (mean)
Time per request:    10255.051 [ms] (mean)
Time per request:    2051.010 [ms] (mean, across all concurrent requests)
Transfer rate:     0.09 [Kbytes/sec] received

Copy after login

性能相当慢了,换成AsyncHTTPClient再测:

class AsyncHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  def get(self, *args, **kwargs):

    url = 'https://api.github.com/'
    http_client = tornado.httpclient.AsyncHTTPClient()
    http_client.fetch(url, self.on_response)
    self.finish('It works')

  @tornado.gen.coroutine
  def on_response(self, response):
    print response.code

Copy after login

qps 提高了很多

Document Path:     /async
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.162 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  30.92 [#/sec] (mean)
Time per request:    161.714 [ms] (mean)
Time per request:    32.343 [ms] (mean, across all concurrent requests)
Transfer rate:     5.95 [Kbytes/sec] received

Copy after login

同样,为了获取response的结果,只需要yield函数。

class AsyncResponseHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    url = 'https://api.github.com/'
    http_client = tornado.httpclient.AsyncHTTPClient()
    response = yield tornado.gen.Task(http_client.fetch, url)
    print response.code
    print response.body

Copy after login

AsyncHTTPClient 转发
使用Tornado经常需要做一些转发服务,需要借助AsyncHTTPClient。既然是转发,就不可能只有get方法,post,put,delete等方法也会有。此时涉及到一些 headers和body,甚至还有https的waring。

下面请看一个post的例子, yield结果,通常,使用yield的时候,handler是需要 tornado.gen.coroutine。

headers = self.request.headers
body = json.dumps({'name': 'rsj217'})
http_client = tornado.httpclient.AsyncHTTPClient()

resp = yield tornado.gen.Task(
  self.http_client.fetch, 
  url,
  method="POST", 
  headers=headers,
  body=body, 
  validate_cert=False)

Copy after login

AsyncHTTPClient 构造请求
如果业务处理并不是在handlers写的,而是在别的地方,当无法直接使用tornado.gen.coroutine的时候,可以构造请求,使用callback的方式。

body = urllib.urlencode(params)
req = tornado.httpclient.HTTPRequest(
 url=url, 
 method='POST', 
 body=body, 
 validate_cert=False) 

http_client.fetch(req, self.handler_response)

def handler_response(self, response):

  print response.code

Copy after login

用法也比较简单,AsyncHTTPClient中的fetch方法,第一个参数其实是一个HTTPRequest实例对象,因此对于一些和http请求有关的参数,例如method和body,可以使用HTTPRequest先构造一个请求,再扔给fetch方法。通常在转发服务的时候,如果开起了validate_cert,有可能会返回599timeout之类,这是一个warning,官方却认为是合理的。

AsyncHTTPClient 上传图片
AsyncHTTPClient 更高级的用法就是上传图片。例如服务有一个功能就是请求第三方服务的图片OCR服务。需要把用户上传的图片,再转发给第三方服务。

@router.Route('/api/v2/account/upload')
class ApiAccountUploadHandler(helper.BaseHandler):
  @tornado.gen.coroutine
  @helper.token_require
  def post(self, *args, **kwargs):
    upload_type = self.get_argument('type', None)

    files_body = self.request.files['file']

    new_file = 'upload/new_pic.jpg'
    new_file_name = 'new_pic.jpg'

    # 写入文件
    with open(new_file, 'w') as w:
      w.write(file_['body'])

    logging.info('user {} upload {}'.format(user_id, new_file_name))

    # 异步请求 上传图片
    with open(new_file, 'rb') as f:
      files = [('image', new_file_name, f.read())]

    fields = (('api_key', KEY), ('api_secret', SECRET))

    content_type, body = encode_multipart_formdata(fields, files)

    headers = {"Content-Type": content_type, 'content-length': str(len(body))}
    request = tornado.httpclient.HTTPRequest(config.OCR_HOST,
                         method="POST", headers=headers, body=body, validate_cert=False)

    response = yield tornado.httpclient.AsyncHTTPClient().fetch(request)

def encode_multipart_formdata(fields, files):
  """
  fields is a sequence of (name, value) elements for regular form fields.
  files is a sequence of (name, filename, value) elements for data to be
  uploaded as files.
  Return (content_type, body) ready for httplib.HTTP instance
  """
  boundary = '----------ThIs_Is_tHe_bouNdaRY_$'
  crlf = '\r\n'
  l = []
  for (key, value) in fields:
    l.append('--' + boundary)
    l.append('Content-Disposition: form-data; name="%s"' % key)
    l.append('')
    l.append(value)
  for (key, filename, value) in files:
    filename = filename.encode("utf8")
    l.append('--' + boundary)
    l.append(
        'Content-Disposition: form-data; name="%s"; filename="%s"' % (
          key, filename
        )
    )
    l.append('Content-Type: %s' % get_content_type(filename))
    l.append('')
    l.append(value)
  l.append('--' + boundary + '--')
  l.append('')
  body = crlf.join(l)
  content_type = 'multipart/form-data; boundary=%s' % boundary
  return content_type, body


def get_content_type(filename):
  import mimetypes

  return mimetypes.guess_type(filename)[0] or 'application/octet-stream'

Copy after login

对比上述的用法,上传图片仅仅是多了一个图片的编码。将图片的二进制数据按照multipart 方式编码。编码的同时,还需要把传递的相关的字段处理好。相比之下,使用requests 的方式则非常简单:

files = {}
f = open('/Users/ghost/Desktop/id.jpg')
files['image'] = f
data = dict(api_key='KEY', api_secret='SECRET')
resp = requests.post(url, data=data, files=files)
f.close()
print resp.status_Code
Copy after login

总结
通过AsyncHTTPClient的使用方式,可以轻松的实现handler对第三方服务的请求。结合前面关于tornado异步的使用方式。无非还是两个key。是否需要返回结果,来确定使用callback的方式还是yield的方式。当然,如果不同的函数都yield,yield也可以一直传递。这个特性,tornado的中的tornado.auth 里面对oauth的认证。

大致就是这样的用法。

source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template