Proxy and Mulithreading for Requests: Solving the ''429 Too Many Requests'' Problem in Python

Optimizing Data Retrieval: Proxies and Threads in Action.

Have you ever encountered ‘429 Too Many Requests’ errors when you needed to download a large amount of data from a public API? If so, this article is for you.

I needed to download about 1.5 million posts and 25 million comments, but the API limited me to three requests per second. Calculating that it would take over 138 hours only for posts, I decided to find a way to speed up the process. The solution was proxies and multithreading.

In this article, I will show you how to optimize data download using proxies and multithreading. The codes presented here will be your cheat sheet for the future.


1. Creating a Data Download Class

class DownloadWorker(object):
    def __init__(self, base_url: str = "https://your-api-endpoint.com/"):
        self.BASE_URL = base_url
        self.MAX_RETRIES = 10
        self.RETRY_DELAY = 1
        # Additional class attributes

Code Summary: This code defines the DownloadWorker class, which will manage the data download process. It initializes the base URL, the number of retry attempts, and the delay between attempts.

2. Preparing a Proxy List

We need a list of proxies from which we will access our resource. Here you have to find a reliable provider yourself. I won’t name the one I use myself, so that it won’t be an advertisement.

Instruction: Create a proxies.txt file with a list of proxy servers in the following format:

http://username:password@host:port
http://username:password@host:port
http://username:password@host:port
http://username:password@host:port

Summary: In this step, you create a file with proxy servers that will be used to circumvent API restrictions. 

You can use any format you feel comfortable with  —  csv, json or whatever format you prefer.

3. Method to Load Proxy List

def load_proxies(self, file_path="proxies.txt"):
    with open(file_path, ''r'') as f:
        return [line.strip() for line in f.readlines()]

Code Summary: This method reads the proxy server file and returns a list of proxies.

4. Configuring Retry Strategy

def configure_retry_strategy(self):
    retry_strategy = Retry(
        total=self.MAX_RETRIES,
        backoff_factor=self.RETRY_DELAY,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET"]
    )
    return HTTPAdapter(max_retries=retry_strategy)

Code Summary: This method sets up a retry strategy to handle potential communication errors and API limits.

5. Method for Safe Data Download

def _fetch_content(self, params: dict, endpoint: str, proxy=None):
    session = requests.Session()
    retry_adapter = self.configure_retry_strategy()
    session.mount("http://", retry_adapter)
    session.mount("https://", retry_adapter)
    full_url = urljoin(self.base_url, endpoint)
    query_string = urlencode(params)
    full_url_with_params = urljoin(full_url, ''?'' + query_string)

    try:
        if proxy:
            session.proxies = {"http": proxy, "https": proxy}

        response = session.get(full_url_with_params)

        status_code = response.status_code

        response.raise_for_status()

        return response.json()
    except requests.exceptions.HTTPError as e:
        logger.info(f"HTTP Error: {e}")
    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")

    return {}

Code Summary: This protected method makes the actual API request using the proxy and the configured retry strategy.

6. Implementing Multithreading

It seems that almost everything is ready. All that is left is to implement the very multithreading I mentioned at the very beginning of this article. You can replace multithreading with multiprocessing if you see the need for it. Note that in my case I save the result to disk and then process it. This is my choice. You can do as you wish.

def process(self, posts: list):
    success_posts = []
    try:
    
        proxies = load_proxies()
    
        with ThreadPoolExecutor(max_workers=len(proxies)) as executor:
    
            for post, proxy in zip(posts, cycle(proxies)):
    
                if executor.submit(self.download, post.post_id, proxy):
    
                    success_posts.append(post.post_id)
    
        self.move_files() # my action with the downloaded data
    
    except KeyboardInterrupt:
    
        logger.info("KeyboardInterrupt processing posts.")
    
    except Exception as e:
    
        logger.error(f"Exception while processing posts. Error: {e}")
    
    finally:
    
        return success_posts

Code Summary: This method implements multithreading for parallel data downloading, significantly speeding up the process.

The cycle function from itertools allows you to loop with zip to iteratively return each successive value until the main iterable object runs out. Very handy.

7. Assembling the Entire Class

import requests
from requests.adapters import HTTPAdapter, Retry
from itertools import cycle
from urllib.parse import urljoin, urlencode
from concurrent.futures import ThreadPoolExecutor
import logging
  
logger = logging.getLogger(__name__)

class DownloadWorker(object):
    def __init__(self, base_url: str = "https://your-api-endpoint.com/"):
        self.BASE_URL = base_url
        self.MAX_RETRIES = 10
        self.RETRY_DELAY = 1
        # Additional class attributes as needed

    def load_proxies(self, file_path="proxies.txt"):
        with open(file_path, ''r'') as f:
            return [line.strip() for line in f.readlines()]

    def configure_retry_strategy(self):
        retry_strategy = Retry(
            total=self.MAX_RETRIES,
            backoff_factor=self.RETRY_DELAY,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET"]
        )
        return HTTPAdapter(max_retries=retry_strategy)

    def _fetch_content(self, params, endpoint, proxy=None):
        session = requests.Session()
        retry_adapter = self.configure_retry_strategy()
        session.mount("http://", retry_adapter)
        session.mount("https://", retry_adapter)
        full_url = urljoin(self.BASE_URL, endpoint)
        query_string = urlencode(params)
        full_url_with_params = urljoin(full_url, ''?'' + query_string)

        try:
            if proxy:
                session.proxies = {"http": proxy, "https": proxy}
            response = session.get(full_url_with_params)
            response.raise_for_status()
            return response.json()

        except requests.exceptions.HTTPError as e:
            logger.error(f"HTTP Error: {e}")

        except Exception as e:
            logger.error(f"An unexpected error occurred: {e}")

        return {}
          
    def process(self, posts: list):
        success_posts = []

        try:
            proxies = self.load_proxies()
            with ThreadPoolExecutor(max_workers=len(proxies)) as executor:
                for post, proxy in zip(posts, cycle(proxies)):
                    future = executor.submit(self.download, post, proxy)
                    if future.result():
                        success_posts.append(post)

        except KeyboardInterrupt:
            logger.info("KeyboardInterrupt processing posts.")

        except Exception as e:
            logger.error(f"Exception while processing posts. Error: {e}")

        finally:
            return success_posts

    def download(self, post_id, proxy):
        post_id = int(post_id)
        params = {"id": post_id}
        content = self._fetch_content(params=params, endpoint="content", proxy=proxy)
        # Further actions with the downloaded (or empty) data

This article demonstrates how you can effectively use proxies and multithreading to bypass API limitations and accelerate the download of large volumes of data. This approach will significantly reduce the time required for data collection and allow you to focus on their analysis and usage.

Continue Learning

Discover more articles on similar topics