- from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
- import concurrent.futures
- import random
- import statistics
- import string
- import sys
- import threading
- import time
-
- import requests
-
- HOST = 'http://localhost:9000'
- TIMEOUT_s = 1
- NUM_REQUESTS = 1000
- NUM_CONCURRENT = 5
-
-
- TIMES = []
-
-
- def send_request(idx, url, data, headers):
- try:
- time_before = time.perf_counter()
- response = requests.post(
- url,
- json=data,
- headers=headers,
- timeout=TIMEOUT_s
- )
- time_after = time.perf_counter()
- if response.status_code != 200:
- print(f'Request {idx} failed.')
- raise Exception('{response.status_code} NOT 200')
- time_duration = time_after - time_before
- TIMES.append(time_duration)
- except (requests.exceptions.RequestException, concurrent.futures.TimeoutError) as exc:
- print(f'Request {idx} failed.')
- raise exc
-
-
- def load_test(url, data_list):
- headers = get_headers()
-
- semaphore = threading.BoundedSemaphore(NUM_CONCURRENT)
-
- # Use ThreadPoolExecutor to send requests concurrently
- with ThreadPoolExecutor(max_workers=NUM_CONCURRENT) as executor:
- # Loop to submit and wait for the requests
- for idx, data in enumerate(data_list , 1):
- # Acquire the semaphore before submitting the request
- success = semaphore.acquire(timeout=TIMEOUT_s)
- if not success:
- # timeout reached
- break
-
- def send_one_request():
- send_request(idx, url, data, headers)
- semaphore.release()
- executor.submit(send_one_request)
-
- print('Done.')
-
-
- print(f'{len(TIMES)} requests succeeded.')
- try:
- time_mean = statistics.mean(TIMES)
- print(f'Mean: {time_mean:.4f}')
- except statistics.StatisticsError:
- time_mean = None
- print(f'Mean: {time_mean}')
- try:
- time_max = max(TIMES)
- print(f'Max: {time_max:.4f}')
- except ValueError:
- time_max = 0
- print(f'Max: {time_max}')
-
- try:
- time_max_index = TIMES.index(time_max)
- except ValueError:
- time_max_index = None
- print(f'Max index: {time_max_index}')