Python Async Limit Concurrent coroutines per second

orville

My use case is the following : I’m using python 3.8

I have an async function analyse_doc that is a wrapper for a http request to a web service. I have approx 1000 docs to analyse as fast as possible. The service allows for 15 transaction per second (and not 15 concurrent request at any second). So first sec I can send 15, then 2nd sec I can send 15 again and so on. If I try to hit the service more than 15 times per sec I get 429 error msg or sometimes 503/504 error (server is busy…)

My question is : is it possible to implement smt in python that effectively sends 15 requests per sec asynchronously then wait 1 sec then do it again until the queue is empty. Also some tasks might fail. Those failing tasks might need a rerun at some point.

So far my code is the following (unbounded parallelism… not even a semaphore) but it handles retry.

tasks = {asyncio.create_task(analyse_async(doc)): doc for doc in documents}
pending = set(tasks)

# Handle retry
while pending:
    #  backoff in case of 429
    time.sleep(1)

    # concurrent call return_when all completed
    finished, pending = await asyncio.wait(
        pending, return_when=asyncio.ALL_COMPLETED
    )

    
    # check  if task has exception and register for new run.
    for task in finished:
        arg = tasks[task]

        if task.exception():
            new_task = asyncio.create_task(analyze_doc(doc))
            tasks[new_task] = doc
            pending.add(new_task)
   
tobyd

You could try adding another sleep tasks into the mix to drive the request generation. Something like this

import asyncio
import random

ONE_SECOND = 1
CONCURRENT_TASK_LIMIT = 2
TASKS_TO_CREATE = 10

loop =  asyncio.new_event_loop()

work_todo = []
work_in_progress = []

# just creates arbitrary work to do
def create_tasks():
    for i in range(TASKS_TO_CREATE):
        work_todo.append(worker_task(i))

    # muddle this up to see how drain works
    random.shuffle(work_todo)

# represents the actual work
async def worker_task(index):
    print(f"i am worker {index} and i am starting")
    await asyncio.sleep(index)
    print(f"i am worker {index} and i am done")

# gets the next 'concurrent' workload segment (if there is one)
def get_next_tasks():
    todo = []

    i = 0

    while i < CONCURRENT_TASK_LIMIT and len(work_todo) > 0:
        todo.append(work_todo.pop())
        i += 1

    return todo

# drains down any outstanding tasks and closes the loop
async def are_we_done_yet():
    print('draining')
    
    await asyncio.gather(*work_in_progress)

    loop.stop()
    
    # closes out the program
    print('done')

# puts work on the queue every tick (1 second)
async def work():
    next_tasks = get_next_tasks()
    if len(next_tasks) > 0:
        print(f'found {len(next_tasks)} tasks to do')
        for task in next_tasks:
            # schedules the work, puts it in the in-progress pile
            work_in_progress.append(loop.create_task(task))

        # this is the 'tick' or speed work gets scheduled on
        await asyncio.sleep(ONE_SECOND)
        
        # every 'tick' we add this tasks onto the loop again unless there isn't any more to do...
        loop.create_task(work())
    else:
        # ... if there isn't any to do we just enter drain mode
        await are_we_done_yet()

# bootstrap the process
create_tasks()
loop.create_task(work())
loop.run_forever()


Updated version with a simulated exception

import asyncio
import random

ONE_SECOND = 1
CONCURRENT_TASK_LIMIT = 2
TASKS_TO_CREATE = 10

loop =  asyncio.new_event_loop()

work_todo = []
work_in_progress = []

# just creates arbitrary work to do
def create_tasks():
    for i in range(TASKS_TO_CREATE):
        work_todo.append(worker_task(i))

    # muddle this up to see how drain works
    random.shuffle(work_todo)

# represents the actual work
async def worker_task(index):
    try:
        print(f"i am worker {index} and i am starting")
        await asyncio.sleep(index)

        if index % 9 == 0:
            print('simulating error')
            raise NotImplementedError("some error happened")

        print(f"i am worker {index} and i am done")
    except:
        # put this work back on the pile (fudge the index so it doesn't throw this time)
        work_todo.append(worker_task(index + 1))
        

# gets the next 'concurrent' workload segment (if there is one)
def get_next_tasks():
    todo = []

    i = 0

    while i < CONCURRENT_TASK_LIMIT and len(work_todo) > 0:
        todo.append(work_todo.pop())
        i += 1

    return todo

# drains down any outstanding tasks and closes the loop
async def are_we_done_yet():
    print('draining')
    
    await asyncio.gather(*work_in_progress)

    if (len(work_todo)) > 0:
        loop.create_task(work())
        print('found some retries')
    else:
        loop.stop()
        # closes out the program
        print('done')
    
    

# puts work on the queue every tick (1 second)
async def work():
    next_tasks = get_next_tasks()
    if len(next_tasks) > 0:
        print(f'found {len(next_tasks)} tasks to do')
        for task in next_tasks:
            # schedules the work, puts it in the in-progress pile
            work_in_progress.append(loop.create_task(task))

        # this is the 'tick' or speed work gets scheduled on
        await asyncio.sleep(ONE_SECOND)
        
        # every 'tick' we add this tasks onto the loop again unless there isn't any more to do...
        loop.create_task(work())
    else:
        # ... if there isn't any to do we just enter drain mode
        await are_we_done_yet()

# bootstrap the process
create_tasks()
loop.create_task(work())
loop.run_forever()

This just simulates something going wrong and re-queues the failed task. If the error happens after the main work method has finished it won't get re-queued so in the are-we-there-yet method it would need to check and rerun any failed tasks - this isn't particularly optimal as it'll wait to drain before checking everything else but gives you an idea of an implementation

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

From Dev

Cloud Firestore concurrent connection limit for mobile per database and Maximum writes per second per database

From Dev

How to Limit request per Second in Async Task C#

From Dev

Unable to limit frames per second while reading frames python

From Dev

Incapsula Rate Limit Per Second

From Dev

Spring @Async with rate per second

From Dev

Azure function: limit the number of calls per second

From Dev

Limit not bandwith, but packets per second in linux

From Dev

gnu parallel: how to set the limit per second

From Dev

Limit number of connections per second in Apache HttpClient

From Dev

Limit fadeOut() to 3 steps per second

From Dev

Gmail API Global Per Second Rate Limit

From Dev

Asyncio - how to limit requests per 1 second?

From Dev

Why is second argument not detected in Python Coroutines?

From Dev

python async - efficiency of sleeping multiple coroutines

From Java

Limit method calls per second(s) (refuse when limit reached)

From Dev

Firebase Storage: Limit of concurrent downloads per user session

From Dev

Increment message per second in Python

From

How to limit the amount of concurrent async I/O operations?

From Dev

Limit concurrent requests to API in NodeJS along with Async/Await

From Dev

Is there a maximum limit on running concurrent threads (Python)?

From Dev

What does a limit of 5 transactions per second per account per open shard for GetShardIterator mean?

From Dev

How to set message per second limit for users on ejabberd?

From Dev

How to limit API calls per second with angular2

From Dev

Node.js - Working with an API limit of 5 requests per second

From

Is it possible to limit how many goroutines run per second?

From Dev

Can we limit the number of request hits per second in karate

From Dev

php - Antiflood - how to limit 2 requests per second

From Dev

Limit WD mobile (2.5”) to 1.5 gig per second

From Dev

What's the limit to the number of requests sent to stackAPI per second?

Related Related

  1. 1

    Cloud Firestore concurrent connection limit for mobile per database and Maximum writes per second per database

  2. 2

    How to Limit request per Second in Async Task C#

  3. 3

    Unable to limit frames per second while reading frames python

  4. 4

    Incapsula Rate Limit Per Second

  5. 5

    Spring @Async with rate per second

  6. 6

    Azure function: limit the number of calls per second

  7. 7

    Limit not bandwith, but packets per second in linux

  8. 8

    gnu parallel: how to set the limit per second

  9. 9

    Limit number of connections per second in Apache HttpClient

  10. 10

    Limit fadeOut() to 3 steps per second

  11. 11

    Gmail API Global Per Second Rate Limit

  12. 12

    Asyncio - how to limit requests per 1 second?

  13. 13

    Why is second argument not detected in Python Coroutines?

  14. 14

    python async - efficiency of sleeping multiple coroutines

  15. 15

    Limit method calls per second(s) (refuse when limit reached)

  16. 16

    Firebase Storage: Limit of concurrent downloads per user session

  17. 17

    Increment message per second in Python

  18. 18

    How to limit the amount of concurrent async I/O operations?

  19. 19

    Limit concurrent requests to API in NodeJS along with Async/Await

  20. 20

    Is there a maximum limit on running concurrent threads (Python)?

  21. 21

    What does a limit of 5 transactions per second per account per open shard for GetShardIterator mean?

  22. 22

    How to set message per second limit for users on ejabberd?

  23. 23

    How to limit API calls per second with angular2

  24. 24

    Node.js - Working with an API limit of 5 requests per second

  25. 25

    Is it possible to limit how many goroutines run per second?

  26. 26

    Can we limit the number of request hits per second in karate

  27. 27

    php - Antiflood - how to limit 2 requests per second

  28. 28

    Limit WD mobile (2.5”) to 1.5 gig per second

  29. 29

    What's the limit to the number of requests sent to stackAPI per second?

HotTag

Archive