Async Web Scraping in Python: asyncio + aiohttp (Concurrency Without Getting Banned)

Async scraping is the fastest way to turn a “30 minute crawl” into a “90 second crawl”.

It’s also the fastest way to get banned.

Most tutorials stop at:

await asyncio.gather(*tasks)

…and then act surprised when:

  • half your requests return 429
  • sockets hang
  • you accidentally open 2,000 connections

This guide is different.

You’ll build a production-grade async scraper template with:

  • bounded concurrency (global and per-host)
  • timeouts that prevent hung requests
  • retry + exponential backoff + jitter
  • polite rate limiting
  • a place to plug in proxy rotation (ProxiesAPI-friendly)
Add a stable proxy layer with ProxiesAPI

Async scraping amplifies both your speed and your risk of blocks. ProxiesAPI helps you rotate egress cleanly while you keep concurrency bounded and retries polite — a reliable combo for large crawls.


The core idea: bounded concurrency

Concurrency is not the same as throughput.

If you hit a target with 500 parallel requests, you don’t get “500x faster” — you usually get:

  • temporary bans
  • captchas
  • throttling
  • broken HTML

A sane default for many sites is:

  • 5–20 concurrent requests total
  • 2–6 concurrent requests per host

Then scale carefully.


Setup

python -m venv .venv
source .venv/bin/activate
pip install aiohttp aiodns
  • aiohttp = async HTTP client
  • aiodns helps DNS performance (optional but nice)

A complete async scraper template

This script:

  • takes a list of URLs
  • fetches them concurrently
  • writes JSONL results

Step 1: Helpers (timeouts, parsing, backoff)

import asyncio
import json
import random
import time
from dataclasses import dataclass
from typing import Optional

import aiohttp


def now_ms() -> int:
    return int(time.time() * 1000)


def jitter(base: float) -> float:
    return base + random.random() * base


def is_retryable(status: int) -> bool:
    return status in (429, 500, 502, 503, 504)


@dataclass
class FetchResult:
    url: str
    status: int | None
    ok: bool
    elapsed_ms: int
    error: str | None
    text: str | None

Step 2: A polite async fetch() with retries

async def fetch_text(
    session: aiohttp.ClientSession,
    url: str,
    *,
    timeout_s: float = 25,
    max_retries: int = 5,
    backoff_base_s: float = 1.0,
) -> FetchResult:
    start = now_ms()

    for attempt in range(1, max_retries + 1):
        try:
            t = aiohttp.ClientTimeout(total=timeout_s)
            async with session.get(url, timeout=t) as resp:
                status = resp.status

                if is_retryable(status):
                    sleep = min(30.0, jitter(backoff_base_s * (2 ** attempt)))
                    await asyncio.sleep(sleep)
                    continue

                text = await resp.text(errors="ignore")

                return FetchResult(
                    url=url,
                    status=status,
                    ok=(200 <= status < 300),
                    elapsed_ms=now_ms() - start,
                    error=None,
                    text=text,
                )

        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            sleep = min(30.0, jitter(backoff_base_s * (2 ** attempt)))
            await asyncio.sleep(sleep)
            last = str(e)
            # continue retrying

    return FetchResult(
        url=url,
        status=None,
        ok=False,
        elapsed_ms=now_ms() - start,
        error=last if 'last' in locals() else 'failed',
        text=None,
    )

Concurrency control: global + per-host

Two semaphores give you a lot of safety:

  • a global semaphore (max in-flight total)
  • a per-host semaphore (max in-flight per domain)
from urllib.parse import urlparse


class ConcurrencyLimiter:
    def __init__(self, global_limit: int = 20, per_host_limit: int = 4):
        self.global_sem = asyncio.Semaphore(global_limit)
        self.per_host_limit = per_host_limit
        self.host_sems: dict[str, asyncio.Semaphore] = {}
        self._lock = asyncio.Lock()

    async def _get_host_sem(self, host: str) -> asyncio.Semaphore:
        async with self._lock:
            if host not in self.host_sems:
                self.host_sems[host] = asyncio.Semaphore(self.per_host_limit)
            return self.host_sems[host]

    async def run(self, url: str, coro_fn):
        host = urlparse(url).netloc
        host_sem = await self._get_host_sem(host)

        async with self.global_sem:
            async with host_sem:
                return await coro_fn()

The crawler: schedule tasks without exploding memory

A common mistake is creating 100k tasks at once.

Instead, we’ll use an asyncio.Queue with a fixed number of workers.

async def worker(
    name: str,
    q: asyncio.Queue,
    limiter: ConcurrencyLimiter,
    session: aiohttp.ClientSession,
    out_fp,
):
    while True:
        url = await q.get()
        if url is None:
            q.task_done()
            return

        async def run_one():
            return await fetch_text(session, url)

        res = await limiter.run(url, run_one)

        # Write JSONL as you go (streaming)
        out_fp.write(json.dumps(res.__dict__, ensure_ascii=False) + "\n")
        out_fp.flush()

        q.task_done()

        if res.ok:
            print(f"[{name}] {res.status} {res.elapsed_ms}ms {url}")
        else:
            print(f"[{name}] FAIL {res.elapsed_ms}ms {url} ({res.error})")

Full main()

async def main():
    urls = [
        "https://news.ycombinator.com/",
        "https://example.com/",
        # add your targets here
    ]

    limiter = ConcurrencyLimiter(global_limit=20, per_host_limit=4)

    headers = {
        "User-Agent": "Mozilla/5.0 (compatible; ProxiesAPI-GuidesBot/1.0; +https://proxiesapi.com)",
        "Accept-Language": "en-US,en;q=0.9",
    }

    conn = aiohttp.TCPConnector(limit=0, ttl_dns_cache=300)

    async with aiohttp.ClientSession(headers=headers, connector=conn) as session:
        q: asyncio.Queue = asyncio.Queue()

        # enqueue
        for u in urls:
            await q.put(u)

        workers = 10
        with open("async_scrape_results.jsonl", "w", encoding="utf-8") as out_fp:
            tasks = [
                asyncio.create_task(worker(f"w{i}", q, limiter, session, out_fp))
                for i in range(workers)
            ]

            # Wait for all work
            await q.join()

            # Stop workers
            for _ in range(workers):
                await q.put(None)
            await q.join()

            for t in tasks:
                await t


if __name__ == "__main__":
    asyncio.run(main())

Proxy rotation patterns (what works)

Async + proxies is where people shoot themselves in the foot.

Good patterns:

1) One proxy per request (rotation)

  • simplest mental model
  • best when the proxy provider gives you a pool

In aiohttp, you can pass a proxy= per request.

async with session.get(url, proxy=proxy_url) as resp:
    ...

Where proxy_url is something you rotate.

2) One proxy per host (stickiness)

  • reduces “session churn”
  • often lowers block rates

You can map host -> proxy for a time window, then rotate.

3) Bounded concurrency per proxy

If you send 50 concurrent requests through one IP, you’ll still get blocked.

Even with proxies, keep something like:

  • 1–3 concurrent requests per exit IP

Practical tuning checklist

  • Start with global_limit=10 and per_host_limit=2.
  • If you see 429s, decrease concurrency first.
  • If you see timeouts, increase timeout_s and reduce concurrency.
  • Cache aggressively. Don’t re-fetch unchanged pages.

Where ProxiesAPI fits

Async scraping increases your request rate; that’s the whole point.

ProxiesAPI can help when you:

  • crawl many pages per run
  • scrape many sites/tickers/products
  • deploy from shared cloud IPs that get blocked faster

But remember: proxies are not a substitute for good citizenship.

The winning combo is:

  • bounded concurrency
  • retry/backoff with jitter
  • proxy rotation

FAQ

Is aiohttp always faster than requests?

For I/O-bound scraping across many URLs, yes — because you overlap waiting time.

Should I use multiprocessing instead?

Only if you’re CPU-bound (heavy parsing/ML). Most scraping is network-bound.

What about Playwright async?

Playwright’s async API is great, but heavier. Use aiohttp for “simple HTML fetch” and Playwright for JS-heavy pages.

Add a stable proxy layer with ProxiesAPI

Async scraping amplifies both your speed and your risk of blocks. ProxiesAPI helps you rotate egress cleanly while you keep concurrency bounded and retries polite — a reliable combo for large crawls.

Related guides

Google Trends Scraping: API Options and DIY Methods (2026)
Compare official and unofficial ways to fetch Google Trends data, plus a DIY approach with throttling, retries, and proxy rotation for stability.
guide#google-trends#web-scraping#python
How to Scrape Google Search Results with Python (Without Getting Blocked)
A practical SERP scraping workflow in Python: handle consent/interstitials, parse organic results defensively, rotate IPs, backoff on blocks, and export clean results. Includes a ProxiesAPI-backed fetch layer.
guide#how to scrape google search results with python#python#serp
How to Scrape E-Commerce Websites: A Practical Guide
A practical playbook for ecommerce scraping: category discovery, pagination patterns, product detail extraction, variants, rate limits, retries, and proxy-backed fetching with ProxiesAPI.
guide#ecommerce scraping#ecommerce#web-scraping
Web Scraping with Python: The Complete 2026 Tutorial
A from-scratch, production-minded guide to web scraping in Python: requests + BeautifulSoup, pagination, retries, caching, proxies, and a reusable scraper template.
guide#web scraping python#python#web-scraping