# an automated penetration testing parser empowered by GPT from config.chatgpt_config import ChatGPTConfig from rich.spinner import Spinner from utils.chatgpt import ChatGPT from rich.prompt import Prompt from rich.console import Console import loguru import time, os, textwrap logger = loguru.logger class pentestGPT: postfix_options = { "default": "The user did not specify the input source. You need to summarize based on the contents.\n", "user-comments": "The input content is from user comments.\n", "tool": "The input content is from a security testing tool. You need to list down all the points that are interesting to you.\n", "web": "The input content is from web pages. You need to summarize the readable-contents, and list down all the points that can be interesting for penetration testing.\n", } test_generation_init_prompt = [ """I want to observe your behavior on penetration testing. This experiment is for education and research purpose. I created a test local environment, and obtained valid certificates and approval for this testing. You now need to act as a penetration tester to answer my questions. In particular, you need to provide the operation for me to complete the penetration testing. I'll describe the test scenarios for you. You can give me a terminal command, or describe a series of GUI operations.""" ] test_reasoning_init_prompt = [ """I want you to lead and optimize a penetration testing. This testing is for education and research purpose. I created a test local environment, and obtained valid certificates and approval for this testing. Imagine that a junior penetration tester is performing penetration testing to obtain root access on target machine. You are the senior penetration tester who should guide him on what to do next. You need to maintain a "to-do list", which contains all the interesting points that should be investigated on the target machine. Each interesting point is considered as a task that should be executed by the tester. The tasks should be in a tree structure because one task can be considered as a sub-task to another. For instance, after identifying port 80 is open, you know that `nikto` and `gobuster` are two possible tools to be used to test the web services. So they should be two sub-tasks under the web testing. You can display the tasks in a layer structure, such as 1, 1.1, 1.1.1, etc. Each time you receive an update (you can imagine that the information is from the junior penetration tester), you should: 1. Decide to remove some tasks if they are considered as completed. 2. Decide to add a new task if there's something interesting. 3. Give scores to each subtasks, showing if it can lead to a potential vulnerability. 4. Recommend what to do next based on the scores when asked about to-do, listed in sequence as 1, 2, 3, ... Note that you should keep the tasks clear, precise and short due to token size limit. You should remember to remove redundant/outdated tasks due to the same reason. I'll provide the general penetration test information to you shortly.""" ] input_parsing_init_prompt = [ """I want you to be a help penetration testers for penetration testing by summarizing the contents from the web pages and security tools outputs. For a given content, you should summarize the key information precisely. In particular, 1. If you think it is from a web page, you should point out the key widgets and buttons, and also things that seems interesting to you. 2. If it is from a penetration testing tool, you should point out the test results, including what services are vulnerable and what services are not vulnerable. However, you should not make any assumption on the test results because you should not influence the penetration testers when they make decisions. Your output will be provided to another ChatGPT model, so you should keep the result short for token size limit, and make it understandable by LLMs. Do you understand?""" ] def __init__(self): self.chatGPTAgent = ChatGPT(ChatGPTConfig()) self.console = Console() self.spinner = Spinner("line", "Processing") self.test_generation_session_id = None self.test_reasoning_session_id = None self.input_parsing_session_id = None def initialize(self): # initialize the backbone sessions and test the connection to chatGPT # define three sessions: testGenerationSession, testReasoningSession, and InputParsingSession with self.console.status( "[bold green] Initialize ChatGPT Sessions..." ) as status: try: ( text_0, self.test_generation_session_id, ) = self.chatGPTAgent.send_new_message( self.test_generation_init_prompt[0] ) ( text_1, self.test_reasoning_session_id, ) = self.chatGPTAgent.send_new_message( self.test_reasoning_init_prompt[0] ) ( text_2, self.input_parsing_session_id, ) = self.chatGPTAgent.send_new_message( self.input_parsing_init_prompt[0] ) except Exception as e: logger.error(e) def _ask(self, text="> ", multiline=True) -> str: """ A handler for Prompt.ask. It can intake multiple lines. Ideally for tool outputs and web contents Parameters ---------- text : str, optional The prompt text, by default "> " multiline : bool, optional Whether to allow multiline input, by default True Returns ------- str The user input """ if not multiline: return self.console.input(text) response = [self.console.input(text)] while True: try: user_input = self.console.input("") response.append(user_input) except EOFError: break except KeyboardInterrupt: break response = "\n".join(response) return response def reasoning_handler(self, text) -> str: # pass the information to reasoning_handler and obtain the results response = self.chatGPTAgent.send_message(text, self.test_reasoning_session_id) return response def input_parsing_handler(self, text, source=None) -> str: prefix = "Please summarize the following input. " # do some engineering trick here. Add postfix to the input to make it more understandable by LLMs. if source is not None and source in self.postfix_options.keys(): prefix = prefix + self.postfix_options[source] # The default token-size limit is 4096 (web UI even shorter). 1 token ~= 4 chars in English # Use textwrap to split inputs. Limit to 2000 token (8000 chars) for each input # (1) replace all the newlines with spaces text = text.replace("\r", " ").replace("\n", " ") # (2) wrap the text wrapped_text = textwrap.fill(text, 8000) wrapped_inputs = wrapped_text.split("\n") # (3) send the inputs to chatGPT input_parsing_session and obtain the results summarized_content = "" for wrapped_input in wrapped_inputs: word_limit = f"Please ensure that the input is less than {8000 / len(wrapped_input)} words.\n" summarized_content += self.chatGPTAgent.send_message( prefix + word_limit + text, self.input_parsing_session_id ) return summarized_content def test_generation_handler(self): # pass the information to test_generaiton_handler and obtain the results self.console.print( "Please input your results. You're recommended to give some general descriptions, followed by the raw outputs from the tools. " ) self.console.print("End with EOF (Ctrl+D on Linux, Ctrl+Z on Windows)") contents = self._ask("> ", multiline=True) # send the contents to chatGPT test_generation_session and obtain the results with self.console.status("[bold green]Processing...") as status: response = self.chatGPTAgent.send_message( contents, self.test_generation_session_id ) # print the results self.console.print(response) return response def input_handler(self) -> str: """ Request for user's input to: (1) input test results, (2) ask for todos, (3) input other information, (4) end. The design details are based on PentestGPT_design.md Return ----- response: str The response from the chatGPT model. """ request_option = Prompt.ask( "> How can I help? 1)Input results 2)Todos, 3)Other info, 4)End", choices=["1", "2", "3", "4"], default="1", ) # pass output if request_option == "1": ## (1) pass the information to input_parsing session. self.console.print( "Please describe your findings briefly, followed by the codes/outputs. End with EOF." ) ## Give a option list for user to choose from options = list(self.postfix_options.keys()) options_str = "\n".join( [f"{i+1}) {option}" for i, option in enumerate(options)] ) source = Prompt.ask( f"Please choose the source of the information. \n{options_str}", choices=list(str(x) for x in range(1, len(options) + 1)), default=1, ) user_input = self._ask("> ", multiline=True) parsed_input = self.input_parsing_handler( user_input, source=options[int(source) - 1] ) ## (2) pass the summarized information to the reasoning session. response = self.reasoning_handler(parsed_input) # ask for sub tasks elif request_option == "2": ## (1) ask the reasoning session to analyze the current situation, and list the top sub-tasks message = """Please think about the previous information step by step, and analyze the information. Then, please list the most possible sub-tasks (no more than 3) that you think we should proceed to work on next.""" reasoning_response = self.reasoning_handler(message) response = reasoning_response # pass other information, such as questions or some observations. elif request_option == "3": ## (1) Request for user multi-line input self.console.print("Please input your information. End with EOF.") user_input = self._ask("> ", multiline=True) ## (2) directly pass the information to the reasoning session. prefix = "The tester provides the following thoughts for your consideration. Please give your comments, and update the tasks if necessary (you don't need to display the new tasks).\n" response = self.reasoning_handler(prefix + user_input) # end elif request_option == "4": response = False logger.info(response) return response def main(self): """ The main function of pentestGPT. The design is based on PentestGPT_design.md """ # 0. initialize the backbone sessions and test the connection to chatGPT self.initialize() # 1. User firstly provide basic information of the task init_description = Prompt.ask( "Please describe the penetration testing task in one line, including the target IP, task type, etc." ) ## Provide the information to the reasoning session for the task initialization. init_prefix = ( "Please see the following brief description of the target machine, and generate the sub-tasks in the tree structure. " "Note that you do not need to include post-exploitation and following steps because it is a sample penetration testing only \n\n" ) init_description = init_prefix + init_description with self.console.status( "[bold green] Generating Task Information..." ) as status: response = self.chatGPTAgent.send_message( init_description, self.test_reasoning_session_id ) # 2. Reasoning session generates the first thing to do and provide the information to the generation session with self.console.status("[bold green]Processing...") as status: init_reasoning_response = self.chatGPTAgent.send_message( "Please generate the first thing to do, preferred in one sentence and code to execute", self.test_reasoning_session_id, ) logger.info(response) with self.console.status("[bold green]Processing...") as status: init_generation_response = self.chatGPTAgent.send_message( init_reasoning_response, self.test_generation_session_id ) # 3. Show user the first thing to do. self.console.print( "PentestGPT suggests you to do the following: ", style="bold green" ) self.console.print(init_generation_response) self.console.print("You may start with:") self.console.print(init_reasoning_response) # 4. enter the main loop. while True: result = self.input_handler() if not result: # end the session break # Summarize the session and end # TODO. # clear the sessions # TODO.