負荷テスト

Webサイト、Webアプリケーション、またはAPIエンドポイントの負荷テストは、コードがスケーリングすることを確認するために常に良い考えです。

C05348A3-9AB8-42C9-A6E0-81DB3AC59FEB
           

負荷テスト

動的Webサイト(CMS)、Webアプリケーション、またはAPIのいずれを構築する場合でも、コードに対して負荷テストを実行して、同時実行エラーがなく、インフラストラクチャが拡張できることを確認することをお勧めします。利用できるツールはたくさんありますが(一部はWeb開発ツールの下にリストされています)、これはPythonで簡単に実行できます。このサンプルスクリプトは多数の同時リクエストを生成しますが、単一のシステムで実行すると適度な量のトラフィックしか生成されないことに注意してください。大規模なテストは、非常に人気のあるWebサイトでも必要です。

また、各アーキテクチャには限界点があり、インフラストラクチャを監視しながら大量の負荷を生成することで、最も弱いリンクの問題を特定して解決できますが、これにより、次の最も弱いリンクを見つけることしかできなくなります。それなど。

パブリックエンドポイントの負荷テストは優れたオプションですが、個々のバックエンドエンドポイントの負荷テストを行って、プレッシャーの下でどのようにスケーリングおよび動作するかを確認することもできます。

負荷テストスクリプトの使用

以下のスクリプトで、 <your_website_url>をプロトコル' https://www.mysite.com/ 'を含むWebサイトのURLに置き換えます。

デフォルトのhttp_connection_timeoutは45秒、予想されるHTTP応答コードは200であり、デフォルトでは、それぞれが10回の呼び出しを行う10個のスレッドを作成し、Webサイトで合計100個の同時要求を行います。意図的にPythonセッションを使用せず、各呼び出しで独自のHTTPセッションを確立する必要がありました。

スクリプト

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division,
                        print_function, unicode_literals)
 
'''
Load testing tool for HTTP requests.
'''
 
# I M P O R T S ###############################################################
 
 
import requests
import sys
import os
import logging
from logging.handlers import RotatingFileHandler
import traceback
from datetime import datetime, timedelta
import threading
import time
from functools import reduce
 
requests.packages.urllib3.disable_warnings()
logging.getLogger("requests").setLevel(logging.WARNING)
 
 
__author__ = "Videre Research, LLC"
__version__ = "1.0.0"
 
# G L O B A L S ###############################################################
 
 
url = '<your_website_url>'
 
http_connection_timeout = 45    # 45 seconds
headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
expectedResponse = 200
num_threads = 10
num_calls = 10
 
errors = {}
threadDuration = []
startTime = []
threadLock = threading.Lock()
 
intervals = (
    ('w', 604800),  # 60 * 60 * 24 * 7
    ('d', 86400),   # 60 * 60 * 24
    ('h', 3600),    # 60 * 60
    ('m', 60),
    ('s', 1),
)
 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("-" * 80)
logger.info("Version:  %s" % (__version__))
logger.info("Path:    %s" % (os.path.realpath(__file__)))
try:
    logFile = os.path.realpath(__file__) + ".log"
    fh = RotatingFileHandler(logFile, maxBytes=(1048576 * 50), backupCount=7)
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(formatter)
    logger.addHandler(fh)
    logger.info("Log file:    %s" % logFile)
except Exception as e:
    logger.warning("Unable to log to file: %s - %s" % (logFile, e))
 
 
# F U N C T I O N S ###########################################################
 
 
def time_this(original_function):
    """Wrapper to print the time it takes for a function to execute."""
    def new_function(*args, **kwargs):
        before = datetime.now()
        x = original_function(*args, **kwargs)
        after = datetime.now()
        global logger
        logger.debug('Duration of %s: %.4fs' % (original_function.__name__, (after - before).total_seconds()))
        return x
    return new_function
 
 
def display_time(seconds, granularity=2):
    """Display time with the appropriate unit."""
    result = []
    for name, count in intervals:
        value = seconds // count
        if value:
            seconds -= value * count
            if value == 1:
                name = name.rstrip('s')
            result.append("{0:.0f}{1}".format(value, name))
    return ' '.join(result[:granularity])
 
 
def total_seconds(dt):
    # Keep backward compatibility with Python 2.6 which doesn't have
    # this method
    if hasattr(datetime, 'total_seconds'):
        return dt.total_seconds()
    else:
        return (dt.microseconds + (dt.seconds + dt.days * 24 * 3600) * 10**6) / 10**6
 
 
@time_this
def send_request(s, verb, url, rc, headers="", data="", terminate=False):
    """Generic method to send an http request."""
    # logger.info("Sending request %s" % url)
 
    try:
        r = s.request(method=verb, url=url, data=data, headers=headers, timeout=http_connection_timeout)
        # r = requests.request(method=verb, url=url, data=data, headers=headers, verify=False, timeout=http_connection_timeout, auth=auth)
        #logger.debug(r.headers)
        #logger.debug(r.text)
 
        if r.status_code == rc:
            return r.status_code, r.text
            #logger.debug(r.text)
        else:
            logger.error(r.status_code)
            logger.error(r.text)
            logger.error(r.headers)
            logger.error(sys.exc_info()[:2])
            if terminate:
                sys.exit(1)
            else:
                return r.status_code, r.text
    except Exception as e:
        logger.error("Error in http request: " + str(e))
        if terminate:
            sys.exit(1)
        else:
            return 0, 'ERROR'
 
 
def ProcessFileThread(i):
    """This is the worker thread function that will process files in the queue."""
    s = requests.Session()
 
    for j in range(num_calls):
        start = time.time()
        rc, response = send_request(s, "GET", url, expectedResponse, headers, False)
        # print(response)
 
        elapsed = str(time.time() - start)
 
        with threadLock:
            threadDuration.append(float(elapsed))
            if rc != expectedResponse:
                print(rc)
                if rc in errors:
                    errors[rc] += 1
                else:
                    errors[rc] = 1
 
 
@time_this
def main():
    """Main function."""
    global auth
    global url
 
    before = datetime.now()
 
    for i in range(num_threads):
        worker = threading.Thread(target=ProcessFileThread, args=(i + 1,))
        worker.setDaemon(True)
        worker.start()
 
    logger.info('*** Main thread waiting')
    worker.join()
    logger.info('*** Main thread Done')
    time.sleep(1)
    after = datetime.now()
    logger.info('Duration  %s' % display_time(total_seconds(after - before)))
 
    logger.info(("Calls:             ", len(threadDuration)))
    logger.info(("Min Response time: ", min(threadDuration)))
    logger.info(("Max Response time: ", max(threadDuration)))
    logger.info(("Avg Response time: ", reduce(lambda x, y: x + y, threadDuration) / len(threadDuration)))
    logger.info(errors)
 
    sys.exit(0)
 
###############################################################################
 
 
if __name__ == "__main__":
    main()
 
# E N D   O F   F I L E #######################################################

スクリプト出力

スクリプトは、最後のスレッドが完了するまで何も表示せず、その後、以下の要約を表示します。ログファイルも作成され、最後の行{}には、発生したエラー(指定したもの以外のHTTPリターンコード)のカウントが含まれることに注意してください。

責任を持って楽しんでください。他の人のWebサイトをロードテストしないでください。

python3 load_test.py 
2022-06-30 09:47:49,908 - INFO - --------------------------------------------------------------------------------
2022-06-30 09:47:49,908 - INFO - Version:  1.0.0
2022-06-30 09:47:49,909 - INFO - Path:    /Users/me/load_test.py
2022-06-30 09:47:49,910 - INFO - Log file:    /Users/me/load_test.py.log
2022-06-30 09:47:49,923 - INFO - *** Main thread waiting
2022-06-30 09:47:52,215 - INFO - *** Main thread Done
2022-06-30 09:47:53,221 - INFO - Duration  3s
('Calls:             ', 100)
('Min Response time: ', 0.13335084915161133)
('Max Response time: ', 0.46799778938293457)
('Avg Response time: ', 0.22425265073776246)
{}
投稿コメント 0

Tagged with:
networking