# chaostoolkit-experiment **Repository Path**: lengdanran/chaostoolkit-experiment ## Basic Information - **Project Name**: chaostoolkit-experiment - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-12-06 - **Last Updated**: 2021-12-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Start Chaos Engineering by ChaosToolkit > Chaos Engineering 意为混沌工程,是检验一个复杂系统韧性的一种技术, > 通过该技术,可以以实验的方式来发现复杂系统中不足,特别是可以在生产 > 环境中引入各种混乱,可以观测到系统应对混乱异常的能力,进而建立我们对系统的信心。 > 这里以AWS开源的Chaos Engineering框架`ChaosToolkit`来简单体验一下如何开展一个简单的混沌工程。 ## 确定目标系统 这里,我采用的是2个简单的flask系统 - `DataSourceService`: 模拟一个数据库服务,代表整个系统的数据源 - `ShowDataService`: 模拟一个展示数据的前台服务 - `Gateway`: 模拟Nginx进行请求转发 - `Keeper`: 后台守护进程,在服务不可用的时候,自动创建新的服务进程实例 这里我会开启多个不同的进程,来模拟生产环境中的容器化集群部署,通过提高系统的冗余性来提高整个系统的可用性。同时,用`Gateway`将来自 客户端的请求分发到搭建的小型伪集群系统中。 ## 编写Experiment.json实验计划 以下是ChaosToolkit官方给出的示例配置 ```json { "title": "What is the impact of an expired certificate on our application chain?", "description": "If a certificate expires, we should gracefully deal with the issue.", "tags": ["tls"], "steady-state-hypothesis": { "title": "Application responds", "probes": [ { "type": "probe", "name": "the-astre-service-must-be-running", "tolerance": true, "provider": { "type": "python", "module": "os.path", "func": "exists", "arguments": { "path": "astre.pid" } } }, { "type": "probe", "name": "the-sunset-service-must-be-running", "tolerance": true, "provider": { "type": "python", "module": "os.path", "func": "exists", "arguments": { "path": "sunset.pid" } } }, { "type": "probe", "name": "we-can-request-sunset", "tolerance": 200, "provider": { "type": "http", "timeout": 3, "verify_tls": false, "url": "https://localhost:8443/city/Paris" } } ] }, "method": [ { "type": "action", "name": "swap-to-expired-cert", "provider": { "type": "process", "path": "cp", "arguments": "expired-cert.pem cert.pem" } }, { "type": "probe", "name": "read-tls-cert-expiry-date", "provider": { "type": "process", "path": "openssl", "arguments": "x509 -enddate -noout -in cert.pem" } }, { "type": "action", "name": "restart-astre-service-to-pick-up-certificate", "provider": { "type": "process", "path": "pkill", "arguments": "--echo -HUP -F astre.pid" } }, { "type": "action", "name": "restart-sunset-service-to-pick-up-certificate", "provider": { "type": "process", "path": "pkill", "arguments": "--echo -HUP -F sunset.pid" }, "pauses": { "after": 1 } } ], "rollbacks": [ { "type": "action", "name": "swap-to-valid-cert", "provider": { "type": "process", "path": "cp", "arguments": "valid-cert.pem cert.pem" } }, { "ref": "restart-astre-service-to-pick-up-certificate" }, { "ref": "restart-sunset-service-to-pick-up-certificate" } ] } ``` pip install chaostoolkit-lib[jsonpath] 现在我们来分部分来阅读这个实验计划。 ![](./imgs/experiment-json.jpg) 从上图中可以看出,这份配置文件需要配置的模块并不是特别多,就以下6项: - title:为此次混沌实验取一个名称 - description:对此次混沌实验的基本概述 - tags:标签 - steady-state-hypothesis:定义稳态假说 - method:定义此次实验会对系统做的一系列干扰行为,主要为`action`和`probe`这两种 - rollback:混沌实验在结束后,理应将之前对系统进行的操作回滚,使得系统恢复到实验之前的状态(可选) 显而易见,以上6项配置,其实重要只有后3项 ### steady-state-hypothesis——定义稳态假说 在这个模块中,定义的是系统处于正常运行的稳态的参数指标,比如,在并发量达到10000QPS的时候,系统的某个接口应该返回code:200.只要在 当前条件下,接口正常响应,我们即认为系统处于正常工作状态。 这个个稳态假说由一个或者多个probe以及与其对应的容错范围组成。每次probe都要在我们给定的目标系统中寻找一个属性,并判断该属性值是否在一个合理的容错范围内。 ### 实验使用的experiment.json文件 ```json { "title": "<======System Chaos Experiment======>", "description": "<===Simple Chaos Experiment By ChaosToolkit===>", "tags": [ "Chaostoolkit Experiment" ], "steady-state-hypothesis": { "title": "System State Before Experiment", "probes": [ { "type": "probe", "name": "<====System GetData Interface Test====>", "tolerance": { "type": "jsonpath", "path": "$.data", "expect": "Handle the get http request method", "target": "body" }, "provider": { "type": "http", "timeout": 20, "verify_tls": false, "url": "http://localhost:5000/getData" } }, { "type": "probe", "name": "<====System ShowData Interface Test====>", "tolerance": { "type": "jsonpath", "path": "$.data", "expect": "Handle the get http request method", "target": "body" }, "provider": { "type": "http", "timeout": 20, "verify_tls": false, "url": "http://localhost:5000/showData" } }, { "type": "probe", "name": "<=====python module call=====>", "tolerance": "this is a test func output", "provider": { "type": "python", "module": "chaostkex.experiment", "func": "test", "arguments": {} } } ] }, "method": [ { "type": "action", "name": "Kill 1 service instance of DataSourceService", "provider": { "type": "python", "module": "chaostkex.experiment", "func": "kill_services", "arguments": { "num": 1, "port_file_path": "E:/desktop/chaosmodule/chaostest/ports/dataSourcePort.txt" } } }, { "type": "action", "name": "Kill 1 service instance of ShowSourceService", "provider": { "type": "python", "module": "chaostkex.experiment", "func": "kill_services", "arguments": { "num": 1, "port_file_path": "E:/desktop/chaosmodule/chaostest/ports/dataShowPort.txt" } } } ], "rollbacks": [] } ``` ## 混沌实验工程步骤 这里系统采用的架构比较简单,DataSource服务独立于其他服务,混沌工程测试系统对外提供的接口`http://127.0.0.1:5000/getData`和`http://127.0.0.1:5000/showData`是否正常工作,请求从网关进入,经过网关分发到服务器上,并返回给调用方。 总体的实验很简单: - 将DataSource和ShowData服务各杀掉一个进程,然后看系统对外开放的两个接口是否能够正常工作 ## 编写服务驱动程序 为了使得Chaostoolkit在实验过程能够对目标系统做各种的action和probe,需要为chaostoolkit定制一个目标系统的实验驱动程序,下面是我这次的驱动程序: ```python import os import platform from chaosservices import DataSourceService, ShowDataService def test(): print("this is a test func output") return "this is a test func output" def kill_services_by_ports(ports: list = []) -> bool: sysstr = platform.system() if (sysstr == "Windows"): try: for port in ports: with os.popen('netstat -ano|findstr "%d"' % int(port)) as res: res = res.read().split('\n') result = [] for line in res: temp = [i for i in line.split(' ') if i != ''] if len(temp) > 4: result.append({'pid': temp[4], 'address': temp[1], 'state': temp[3]}) for r in result: if int(r['pid']) == 0: continue os.system(command="taskkill /f /pid %d" % int(r['pid'])) except Exception as e: print(e) return False return True else: print("Other System tasks") for port in ports: command = '''kill -9 $(netstat -nlp | grep :''' + \ str(port) + ''' | awk '{print $7}' | awk -F"/" '{ print $1 }')''' os.system(command) return True def get_ports(port_file_path: str) -> list: if port_file_path is None or os.path.exists(port_file_path) is False: raise FileNotFoundError ports = [] with open(port_file_path, 'r') as f: lines = f.readlines() for line in lines: if line.strip() != '': ports.append(line.strip()) return list(set(ports)) def kill_services(num: int = 1, port_file_path: str = '') -> bool: if num < 1: return True ports = get_ports(port_file_path=port_file_path) cnt = min(num, len(ports)) for i in range(0, cnt): kill_services_by_ports([ports[i]]) return True def start_datasource_service(port: int = 8080, portsfile: str = None) -> bool: DataSourceService.start(port=port, portsfile=portsfile) return True def start_showdata_service(port: int = 8090, portsfile: str = None) -> bool: ShowDataService.start(port=port, portsfile=portsfile) return True if __name__ == '__main__': # port_file_path = '../chaosservices/ports/dataSourcePort.txt' # kill_services(num=1, port_file_path=port_file_path) kill_services_by_ports([8080]) ``` ## 目标系统程序 ### DataSource ```python from typing import Dict from flask import Flask, request app = Flask(__name__) @app.route("/", methods=["GET"]) def getData() -> Dict[str, str]: if request.method == "GET": return {"data": "Handle the get http request method"} else: return {"data": "Other methods handled."} def clear_file(portsfile=None) -> None: f = open(portsfile, 'w') f.truncate() f.close() def start(host='127.0.0.1', port=8080, portsfile='./ports/dataSourcePort.txt') -> None: print("[Info]:\tServe on %s" % str(port)) clear_file(portsfile=portsfile) with open(portsfile, "a+") as f: f.write(str(port) + '\n') app.run(host=host, port=port, debug=False) if __name__ == '__main__': start(port=8080, portsfile='E:/desktop/chaosmodule/chaostest/ports/dataSourcePort.txt') ``` ### ShowDataService ```python import requests as net_req from flask import Flask app = Flask(__name__) # 添加了命令行启动参数项,chaostoolkit将不会正确识别 # parser = argparse.ArgumentParser(description='manual to this script') # parser.add_argument("--host", type=str, default="127.0.0.1") # parser.add_argument("--port", type=int, default=8090) # parser.add_argument("--portsfile", type=str, default='./ports/showPort.txt') # args = parser.parse_args() url = 'http://127.0.0.1:5000/getData' @app.route('/', methods=['GET']) def show_data() -> str: rsp = net_req.get(url=url) print(rsp) return rsp.text def clear_file(portsfile=None) -> None: f = open(portsfile, 'w') f.truncate() f.close() def start(host='127.0.0.1', port=8090, portsfile='./ports/dataShowPort.txt') -> None: print("[Info]:\tServe on %s" % str(port)) clear_file(portsfile=portsfile) with open(portsfile, "a+") as f: f.write(str(port) + '\n') app.run(host=host, port=port, debug=False) if __name__ == '__main__': start(port=8090, portsfile='E:/desktop/chaosmodule/chaostest/ports/dataShowPort.txt') ``` ### Gateway ```python import requests as net import json import sys from flask import Flask, request app = Flask(__name__) # 数据源服务器列表 datasource = [] # 数据显示前台服务列表 datashow = [] datasource_idx = 0 datashow_idx = 0 @app.route('/getData', methods=['GET']) def get_data() -> str: print('[====INFO===]:\tHandle the request from %s' % request.url) res = get(urls=datasource) return res if res != '' else 'There is no DataSourceService available.' @app.route('/showData', methods=['GET']) def show_data() -> str: print('[====INFO===]:\tHandle the request from %s' % request.url) res = get(urls=datashow) return res if res != '' else 'There is no ShowDataService available.' def get(urls: list) -> str: """ 根据给定的URL列表,请求第一个可行的URL, 并返回响应结果 :param urls: url集合 :return: 响应字符串str """ for url in urls: try: rsp = net.get(url, timeout=10) print('[====INFO====]:\tForward this request to %s' % url) return rsp.text except Exception as e: print("[====EXCEPTION====]:\t%s" % e) continue return '' def _get_configuration(file_path='./conf/gateway.json') -> None: """ 从配置文件中加载配置 :param file_path:配置文件的路径,默认是 './conf/gateway.json' :return: None """ print('[====INFO====]:\tLoad configuration from file : %s' % file_path) with open(file_path) as f: conf = json.load(f) global datasource, datashow datasource = conf["datasource"] datashow = conf["datashow"] if __name__ == '__main__': print('[====INFO====]:\tLoads the configuration......') try: _get_configuration() except IOError as error: print('[====ERROR====]:\t%s' % error) sys.exit(-1) print('[====INFO====]:\tStart the Gateway...') app.run(host='127.0.0.1', port=5000, debug=False) ``` ### Keeper 这部分程序是用来监听服务状态,如果服务不可用,可以自动地启动新的服务,使得系统正常工作 ```python import os import socket import time import DataSourceService, ShowDataService from multiprocessing import Process def get_ports(port_file_path: str) -> list: if port_file_path is None or os.path.exists(port_file_path) is False: raise FileNotFoundError ports = [] with open(port_file_path, 'r') as f: lines = f.readlines() for line in lines: if line.strip() != '': ports.append(int(line.strip())) return list(set(ports)) def get_available_service(port_file: str = None) -> bool: if port_file is None: return False ports = get_ports(port_file_path=port_file) for p in ports: if check_port_in_use(port=p): return True return False def check_port_in_use(host='127.0.0.1', port=8080) -> bool: s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(1) s.connect((host, int(port))) return True except socket.error: return False finally: if s: s.close() def creat(func, args): p = Process(target=func, args=args) p.start() def start(port_files: list = []) -> None: sleep_time = 5 while True: print('Start Checking...') # 获得每个服务对应端口列表 port_file = port_files[0] # 检查是否有可用的服务实例 if get_available_service(port_file=port_file) is False: # 没有可用的服务实例,创建新的实例 print('[===INFO===]:\t创建DataSourceService实例') ports = get_ports(port_file_path=port_file) if len(ports) == 0: last = 8080 else: last = ports[-1] new_p = last + 1 DataSourceService.clear_file(portsfile=port_file) creat(func=DataSourceService.start, args=('127.0.0.1', new_p,port_file,)) port_file = port_files[1] # 检查是否有可用的服务实例 if get_available_service(port_file=port_file) is False: # 没有可用的服务实例,创建新的实例 print('[===INFO===]:\t创建ShowDataService实例') ports = get_ports(port_file_path=port_file) if len(ports) == 0: last = 8090 else: last = ports[-1] new_p = last + 1 ShowDataService.clear_file(portsfile=port_file) creat(func=ShowDataService.start, args=('127.0.0.1', new_p, port_file,)) time.sleep(sleep_time) if __name__ == '__main__': start(port_files=[ 'E:/desktop/chaosmodule/chaostest/ports/dataSourcePort.txt', 'E:/desktop/chaosmodule/chaostest/ports/dataShowPort.txt' ]) ``` ## 启动实验 ### 系统存在缺陷——Keeper守护进程没有启动 在这系统中,只是启动一个Gateway、DataSource、ShowData服务,按照正常的实验逻辑,会杀掉DataSource和ShowData服务,这样一来,系统对外提供的接口肯定会出现问题,chaostoolkit理应为我们检测出这样一个很明显系统韧性不足之处。 ```shell $ chaos run experiment.json ``` 启动目标系统: ![](./imgs/1.jpg) 运行结果: ![](./imgs/2.jpg) 从运行结果中我们可以很明显地发现,出现了 ```shell [2021-12-06 17:31:50 CRITICAL] Steady state probe '<====System GetData Interface Test====>' is not in the given tolerance so failing this experiment ``` 说明chaostoolkit为我们发现了系统的韧性不足,是在验证`<====System GetData Interface Test====>`这个阶段检测出来的 ```shell [2021-12-06 17:31:50 INFO] Experiment ended with status: deviated [2021-12-06 17:31:50 INFO] The steady-state has deviated, a weakness may have been discovered ``` 在我们执行`chaos run`命令的目录中,会生成实验生成的`journal.json`文件,里面包含了实验的详细报告数据。 ### 启动2个服务实例 上述韧性不足的原因是,服务是单例的,可用性不高,为了提高可用性,一个简单的方法就是提高系统的冗余性,这次实验中,我为DataSource和ShowData分别启动2个服务实例,再次运行混沌实验 ![](./imgs/3.jpg) 可见提高冗余性之后,系统在被注入干扰后,仍然可以正常运行 ### 启动Keeper守护 除了提高冗余性的办法来解决这个问题,还可以开启一个监控进程,时刻监控服务状态,一旦服务异常,重新生成一个新的服务实例,来提高可用性 ![](./imgs/4.jpg) ![](./imgs/5.jpg) ![](./imgs/6.jpg) 可见,系统的韧性也得到了提高!