Have you ever encountered ‘429 Too Many Requests’ errors when you needed to download a large amount of data from a public API? If so, this article is for you.
I needed to download about 1.5 million posts and 25 million comments, but the API limited me to three requests per second. Calculating that it would take over 138 hours only for posts, I decided to find a way to speed up the process. The solution was proxies and multithreading.
In this article, I will show you how to optimize data download using proxies and multithreading. The codes presented here will be your cheat sheet for the future.
1. Creating a Data Download Class
class DownloadWorker(object):
def __init__(self, base_url: str = "https://your-api-endpoint.com/"):
self.BASE_URL = base_url
self.MAX_RETRIES = 10
self.RETRY_DELAY = 1
# Additional class attributes
Code Summary: This code defines the DownloadWorker
class, which will manage the data download process. It initializes the base URL, the number of retry attempts, and the delay between attempts.
2. Preparing a Proxy List
We need a list of proxies from which we will access our resource. Here you have to find a reliable provider yourself. I won’t name the one I use myself, so that it won’t be an advertisement.
Instruction: Create a proxies.txt
file with a list of proxy servers in the following format:
http://username:password@host:port
http://username:password@host:port
http://username:password@host:port
http://username:password@host:port
Summary: In this step, you create a file with proxy servers that will be used to circumvent API restrictions.
You can use any format you feel comfortable with — csv, json or whatever format you prefer.
3. Method to Load Proxy List
def load_proxies(self, file_path="proxies.txt"):
with open(file_path, ''r'') as f:
return [line.strip() for line in f.readlines()]
Code Summary: This method reads the proxy server file and returns a list of proxies.
4. Configuring Retry Strategy
def configure_retry_strategy(self):
retry_strategy = Retry(
total=self.MAX_RETRIES,
backoff_factor=self.RETRY_DELAY,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"]
)
return HTTPAdapter(max_retries=retry_strategy)
Code Summary: This method sets up a retry strategy to handle potential communication errors and API limits.
5. Method for Safe Data Download
def _fetch_content(self, params: dict, endpoint: str, proxy=None):
session = requests.Session()
retry_adapter = self.configure_retry_strategy()
session.mount("http://", retry_adapter)
session.mount("https://", retry_adapter)
full_url = urljoin(self.base_url, endpoint)
query_string = urlencode(params)
full_url_with_params = urljoin(full_url, ''?'' + query_string)
try:
if proxy:
session.proxies = {"http": proxy, "https": proxy}
response = session.get(full_url_with_params)
status_code = response.status_code
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
logger.info(f"HTTP Error: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return {}
Code Summary: This protected method makes the actual API request using the proxy and the configured retry strategy.
6. Implementing Multithreading
It seems that almost everything is ready. All that is left is to implement the very multithreading I mentioned at the very beginning of this article. You can replace multithreading with multiprocessing if you see the need for it. Note that in my case I save the result to disk and then process it. This is my choice. You can do as you wish.
def process(self, posts: list):
success_posts = []
try:
proxies = load_proxies()
with ThreadPoolExecutor(max_workers=len(proxies)) as executor:
for post, proxy in zip(posts, cycle(proxies)):
if executor.submit(self.download, post.post_id, proxy):
success_posts.append(post.post_id)
self.move_files() # my action with the downloaded data
except KeyboardInterrupt:
logger.info("KeyboardInterrupt processing posts.")
except Exception as e:
logger.error(f"Exception while processing posts. Error: {e}")
finally:
return success_posts
Code Summary: This method implements multithreading for parallel data downloading, significantly speeding up the process.
The cycle function from itertools allows you to loop with zip to iteratively return each successive value until the main iterable object runs out. Very handy.
7. Assembling the Entire Class
import requests
from requests.adapters import HTTPAdapter, Retry
from itertools import cycle
from urllib.parse import urljoin, urlencode
from concurrent.futures import ThreadPoolExecutor
import logging
logger = logging.getLogger(__name__)
class DownloadWorker(object):
def __init__(self, base_url: str = "https://your-api-endpoint.com/"):
self.BASE_URL = base_url
self.MAX_RETRIES = 10
self.RETRY_DELAY = 1
# Additional class attributes as needed
def load_proxies(self, file_path="proxies.txt"):
with open(file_path, ''r'') as f:
return [line.strip() for line in f.readlines()]
def configure_retry_strategy(self):
retry_strategy = Retry(
total=self.MAX_RETRIES,
backoff_factor=self.RETRY_DELAY,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"]
)
return HTTPAdapter(max_retries=retry_strategy)
def _fetch_content(self, params, endpoint, proxy=None):
session = requests.Session()
retry_adapter = self.configure_retry_strategy()
session.mount("http://", retry_adapter)
session.mount("https://", retry_adapter)
full_url = urljoin(self.BASE_URL, endpoint)
query_string = urlencode(params)
full_url_with_params = urljoin(full_url, ''?'' + query_string)
try:
if proxy:
session.proxies = {"http": proxy, "https": proxy}
response = session.get(full_url_with_params)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP Error: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return {}
def process(self, posts: list):
success_posts = []
try:
proxies = self.load_proxies()
with ThreadPoolExecutor(max_workers=len(proxies)) as executor:
for post, proxy in zip(posts, cycle(proxies)):
future = executor.submit(self.download, post, proxy)
if future.result():
success_posts.append(post)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt processing posts.")
except Exception as e:
logger.error(f"Exception while processing posts. Error: {e}")
finally:
return success_posts
def download(self, post_id, proxy):
post_id = int(post_id)
params = {"id": post_id}
content = self._fetch_content(params=params, endpoint="content", proxy=proxy)
# Further actions with the downloaded (or empty) data
This article demonstrates how you can effectively use proxies and multithreading to bypass API limitations and accelerate the download of large volumes of data. This approach will significantly reduce the time required for data collection and allow you to focus on their analysis and usage.