부하 테스트

웹 사이트, 웹 애플리케이션 또는 API 엔드포인트를 로드 테스트하는 것은 항상 코드가 확장되는지 확인하는 것이 좋습니다.

C05348A3-9AB8-42C9-A6E0-81DB3AC59FEB
           

부하 테스트

동적 웹 사이트(CMS), 웹 애플리케이션 또는 API를 빌드하는지 여부에 관계없이 항상 코드에 대한 부하 테스트를 수행하여 동시성 오류가 없고 인프라가 확장되는지 확인하는 것이 좋습니다. 사용 가능한 도구가 많이 있지만(일부는 웹 개발 도구 아래에 나열됨) Python에서 쉽게 수행할 수 있습니다. 이 예제 스크립트는 많은 동시 요청을 생성하지만 단일 시스템에서 실행하면 적당한 양의 트래픽만 생성하지만 대규모로 인기 있는 웹 사이트에는 여전히 대규모 테스트가 필요합니다.

또한 각 아키텍처에는 한계점이 있으므로 인프라를 모니터링하는 동안 많은 양의 로드를 생성하면 가장 약한 링크의 문제를 식별하고 해결할 수 있지만 이렇게 하면 다음으로 약한 링크만 발견할 수 있습니다. 등등.

공개 엔드포인트를 로드 테스트하는 것은 훌륭한 옵션이지만 개별 백엔드 엔드포인트를 로드 테스트하여 압박을 받는 상황에서 어떻게 확장되고 동작하는지 확인할 수도 있습니다.

부하 테스트 스크립트 사용

아래 스크립트에서 <your_website_url> 을 ' https://www.mysite.com/ ' 프로토콜을 포함하여 웹사이트의 URL로 바꿉니다.

기본 http_connection_timeout 은 45초이고 예상되는 HTTP 응답 코드는 200이며 기본적으로 웹 사이트에서 총 100개의 동시 요청에 대해 각각 10회의 호출을 수행하는 10개의 스레드를 생성합니다. 우리는 의도적으로 Python 세션을 사용하지 않았으며 각 호출이 자체 HTTP 세션을 설정하기를 원했습니다.

스크립트

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division,
                        print_function, unicode_literals)
 
'''
Load testing tool for HTTP requests.
'''
 
# I M P O R T S ###############################################################
 
 
import requests
import sys
import os
import logging
from logging.handlers import RotatingFileHandler
import traceback
from datetime import datetime, timedelta
import threading
import time
from functools import reduce
 
requests.packages.urllib3.disable_warnings()
logging.getLogger("requests").setLevel(logging.WARNING)
 
 
__author__ = "Videre Research, LLC"
__version__ = "1.0.0"
 
# G L O B A L S ###############################################################
 
 
url = '<your_website_url>'
 
http_connection_timeout = 45    # 45 seconds
headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
expectedResponse = 200
num_threads = 10
num_calls = 10
 
errors = {}
threadDuration = []
startTime = []
threadLock = threading.Lock()
 
intervals = (
    ('w', 604800),  # 60 * 60 * 24 * 7
    ('d', 86400),   # 60 * 60 * 24
    ('h', 3600),    # 60 * 60
    ('m', 60),
    ('s', 1),
)
 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("-" * 80)
logger.info("Version:  %s" % (__version__))
logger.info("Path:    %s" % (os.path.realpath(__file__)))
try:
    logFile = os.path.realpath(__file__) + ".log"
    fh = RotatingFileHandler(logFile, maxBytes=(1048576 * 50), backupCount=7)
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(formatter)
    logger.addHandler(fh)
    logger.info("Log file:    %s" % logFile)
except Exception as e:
    logger.warning("Unable to log to file: %s - %s" % (logFile, e))
 
 
# F U N C T I O N S ###########################################################
 
 
def time_this(original_function):
    """Wrapper to print the time it takes for a function to execute."""
    def new_function(*args, **kwargs):
        before = datetime.now()
        x = original_function(*args, **kwargs)
        after = datetime.now()
        global logger
        logger.debug('Duration of %s: %.4fs' % (original_function.__name__, (after - before).total_seconds()))
        return x
    return new_function
 
 
def display_time(seconds, granularity=2):
    """Display time with the appropriate unit."""
    result = []
    for name, count in intervals:
        value = seconds // count
        if value:
            seconds -= value * count
            if value == 1:
                name = name.rstrip('s')
            result.append("{0:.0f}{1}".format(value, name))
    return ' '.join(result[:granularity])
 
 
def total_seconds(dt):
    # Keep backward compatibility with Python 2.6 which doesn't have
    # this method
    if hasattr(datetime, 'total_seconds'):
        return dt.total_seconds()
    else:
        return (dt.microseconds + (dt.seconds + dt.days * 24 * 3600) * 10**6) / 10**6
 
 
@time_this
def send_request(s, verb, url, rc, headers="", data="", terminate=False):
    """Generic method to send an http request."""
    # logger.info("Sending request %s" % url)
 
    try:
        r = s.request(method=verb, url=url, data=data, headers=headers, timeout=http_connection_timeout)
        # r = requests.request(method=verb, url=url, data=data, headers=headers, verify=False, timeout=http_connection_timeout, auth=auth)
        #logger.debug(r.headers)
        #logger.debug(r.text)
 
        if r.status_code == rc:
            return r.status_code, r.text
            #logger.debug(r.text)
        else:
            logger.error(r.status_code)
            logger.error(r.text)
            logger.error(r.headers)
            logger.error(sys.exc_info()[:2])
            if terminate:
                sys.exit(1)
            else:
                return r.status_code, r.text
    except Exception as e:
        logger.error("Error in http request: " + str(e))
        if terminate:
            sys.exit(1)
        else:
            return 0, 'ERROR'
 
 
def ProcessFileThread(i):
    """This is the worker thread function that will process files in the queue."""
    s = requests.Session()
 
    for j in range(num_calls):
        start = time.time()
        rc, response = send_request(s, "GET", url, expectedResponse, headers, False)
        # print(response)
 
        elapsed = str(time.time() - start)
 
        with threadLock:
            threadDuration.append(float(elapsed))
            if rc != expectedResponse:
                print(rc)
                if rc in errors:
                    errors[rc] += 1
                else:
                    errors[rc] = 1
 
 
@time_this
def main():
    """Main function."""
    global auth
    global url
 
    before = datetime.now()
 
    for i in range(num_threads):
        worker = threading.Thread(target=ProcessFileThread, args=(i + 1,))
        worker.setDaemon(True)
        worker.start()
 
    logger.info('*** Main thread waiting')
    worker.join()
    logger.info('*** Main thread Done')
    time.sleep(1)
    after = datetime.now()
    logger.info('Duration  %s' % display_time(total_seconds(after - before)))
 
    logger.info(("Calls:             ", len(threadDuration)))
    logger.info(("Min Response time: ", min(threadDuration)))
    logger.info(("Max Response time: ", max(threadDuration)))
    logger.info(("Avg Response time: ", reduce(lambda x, y: x + y, threadDuration) / len(threadDuration)))
    logger.info(errors)
 
    sys.exit(0)
 
###############################################################################
 
 
if __name__ == "__main__":
    main()
 
# E N D   O F   F I L E #######################################################

스크립트 출력

스크립트는 마지막 스레드가 완료될 때까지 아무 것도 표시하지 않고 아래 요약을 표시합니다. 로그 파일도 생성되며 {} 마지막 줄에는 발생한 오류(지정한 것과 다른 HTTP 반환 코드)의 수가 포함됩니다.

책임감 있게 즐기세요. 다른 사람의 웹사이트를 로드 테스트하지 마세요!

python3 load_test.py 
2022-06-30 09:47:49,908 - INFO - --------------------------------------------------------------------------------
2022-06-30 09:47:49,908 - INFO - Version:  1.0.0
2022-06-30 09:47:49,909 - INFO - Path:    /Users/me/load_test.py
2022-06-30 09:47:49,910 - INFO - Log file:    /Users/me/load_test.py.log
2022-06-30 09:47:49,923 - INFO - *** Main thread waiting
2022-06-30 09:47:52,215 - INFO - *** Main thread Done
2022-06-30 09:47:53,221 - INFO - Duration  3s
('Calls:             ', 100)
('Min Response time: ', 0.13335084915161133)
('Max Response time: ', 0.46799778938293457)
('Avg Response time: ', 0.22425265073776246)
{}
댓글을 게시했습니다: 0

Tagged with:
networking