# an automated penetration testing parser empowered by GPT from config.chatgpt_config import ChatGPTConfig from rich.spinner import Spinner from utils.chatgpt import ChatGPT from rich.console import Console from prompts.prompt_class import PentestGPTPrompt from utils.prompt_select import prompt_select, prompt_ask from prompt_toolkit.formatted_text import HTML from utils.task_handler import main_task_entry, mainTaskCompleter import loguru import time, os, textwrap logger = loguru.logger logger.add(sink="logs/pentest_gpt.log") def prompt_continuation(width, line_number, wrap_count): """ The continuation: display line numbers and '->' before soft wraps. Notice that we can return any kind of formatted text from here. The prompt continuation doesn't have to be the same width as the prompt which is displayed before the first line, but in this example we choose to align them. The `width` input that we receive here represents the width of the prompt. """ if wrap_count > 0: return " " * (width - 3) + "-> " else: text = ("- %i - " % (line_number + 1)).rjust(width) return HTML("%s") % text class pentestGPT: postfix_options = { "tool": "The input content is from a security testing tool. You need to list down all the points that are interesting to you; you should summarize it as if you are reporting to a senior penetration tester for further guidance.\n", "user-comments": "The input content is from user comments.\n", "web": "The input content is from web pages. You need to summarize the readable-contents, and list down all the points that can be interesting for penetration testing.\n", "default": "The user did not specify the input source. You need to summarize based on the contents.\n", } def __init__(self): self.chatGPTAgent = ChatGPT(ChatGPTConfig()) self.chatGPT4Agent = ChatGPT(ChatGPTConfig(model="gpt-4")) self.prompts = PentestGPTPrompt self.console = Console() self.spinner = Spinner("line", "Processing") self.test_generation_session_id = None self.test_reasoning_session_id = None self.input_parsing_session_id = None self.chat_count = 0 self.step_reasoning = ( None # the response from the reasoning session for the current step ) def initialize(self): # initialize the backbone sessions and test the connection to chatGPT # define three sessions: testGenerationSession, testReasoningSession, and InputParsingSession with self.console.status( "[bold green] Initialize ChatGPT Sessions..." ) as status: try: ( text_0, self.test_generation_session_id, ) = self.chatGPTAgent.send_new_message( self.prompts.generation_session_init, ) ( text_1, self.test_reasoning_session_id, ) = self.chatGPT4Agent.send_new_message( self.prompts.reasoning_session_init ) ( text_2, self.input_parsing_session_id, ) = self.chatGPTAgent.send_new_message(self.prompts.input_parsing_init) except Exception as e: logger.error(e) self.console.print("- ChatGPT Sessions Initialized.", style="bold green") def reasoning_handler(self, text) -> str: # summarize the contents if necessary. if len(text) > 8000: text = self.input_parsing_handler(text) # pass the information to reasoning_handler and obtain the results response = self.chatGPT4Agent.send_message( self.prompts.process_results + text, self.test_reasoning_session_id ) return response def input_parsing_handler(self, text, source=None) -> str: prefix = "Please summarize the following input. " # do some engineering trick here. Add postfix to the input to make it more understandable by LLMs. if source is not None and source in self.postfix_options.keys(): prefix = prefix + self.postfix_options[source] # The default token-size limit is 4096 (web UI even shorter). 1 token ~= 4 chars in English # Use textwrap to split inputs. Limit to 2000 token (8000 chars) for each input # (1) replace all the newlines with spaces text = text.replace("\r", " ").replace("\n", " ") # (2) wrap the text wrapped_text = textwrap.fill(text, 8000) wrapped_inputs = wrapped_text.split("\n") # (3) send the inputs to chatGPT input_parsing_session and obtain the results summarized_content = "" for wrapped_input in wrapped_inputs: word_limit = f"Please ensure that the input is less than {8000 / len(wrapped_inputs)} words.\n" summarized_content += self.chatGPTAgent.send_message( prefix + word_limit + wrapped_input, self.input_parsing_session_id ) return summarized_content def test_generation_handler(self, text): # send the contents to chatGPT test_generation_session and obtain the results response = self.chatGPTAgent.send_message(text, self.test_generation_session_id) # print the results return response def input_handler(self) -> str: """ Request for user's input to: (1) input test results, (2) ask for todos, (3) input other information, (4) end. The design details are based on PentestGPT_design.md Return ----- response: str The response from the chatGPT model. """ self.chat_count += 1 request_option = main_task_entry() # request_option = prompt_select( # title=f"({self.chat_count}) > Please select your options with cursor: ", # values=[ # ("1", HTML('')), # ("2", HTML('')), # ("3", HTML('')), # ("4", HTML('')), # ], # ) # pass output if request_option == "help": print(mainTaskCompleter().task_details) if request_option == "next": ## (1) pass the information to input_parsing session. ## Give a option list for user to choose from options = list(self.postfix_options.keys()) value_list = [ (i, HTML(f'')) for i in range(len(options)) ] source = prompt_select( title="Please choose the source of the information.", values=value_list ) self.console.print( "Your input: (End with )", style="bold green" ) user_input = prompt_ask("> ", multiline=True) with self.console.status("[bold green] PentestGPT Thinking...") as status: parsed_input = self.input_parsing_handler( user_input, source=options[int(source)] ) ## (2) pass the summarized information to the reasoning session. reasoning_response = self.reasoning_handler(parsed_input) self.step_reasoning_response = reasoning_response ## (3) print the results self.console.print( "Based on the analysis, the following tasks are recommended:", style="bold green", ) self.console.print(reasoning_response + "\n") response = reasoning_response # generate more test details (beginner mode) elif request_option == "more": ## (1) pass the reasoning results to the test_generation session. if self.step_reasoning_response is None: self.console.print( "You have not initialized the task yet. Please perform the basic testing following `next` option.", style="bold red", ) return with self.console.status("[bold green] PentestGPT Thinking...") as status: generation_response = self.test_generation_handler( self.step_reasoning_response ) self.console.print( "Below are the further details.", style="bold green", ) self.console.print(generation_response + "\n") response = generation_response # ask for task list (to-do list) elif request_option == "todo": ## (1) ask the reasoning session to analyze the current situation, and list the top sub-tasks with self.console.status("[bold green] PentestGPT Thinking...") as status: reasoning_response = self.reasoning_handler(self.prompts.ask_todo) ## (2) pass the sub-tasks to the test_generation session. message = self.prompts.todo_to_command + "\n" + reasoning_response generation_response = self.test_generation_handler(message) ## (3) print the results self.console.print( "Based on the analysis, the following tasks are recommended:", style="bold green", ) self.console.print(reasoning_response + "\n") self.console.print( "You can follow the instructions below to complete the tasks.", style="bold green", ) self.console.print(generation_response + "\n") response = reasoning_response # pass other information, such as questions or some observations. elif request_option == "discuss": ## (1) Request for user multi-line input self.console.print("Please share your thoughts/questions with PentestGPT.") user_input = prompt_ask( "(End with ) Your input: ", multiline=True ) ## (2) pass the information to the reasoning session. with self.console.status("[bold green] PentestGPT Thinking...") as status: response = self.reasoning_handler(self.prompts.discussion + user_input) ## (3) print the results self.console.print("PentestGPT:\n", style="bold green") self.console.print(response + "\n", style="yellow") # end elif request_option == "quit": response = False self.console.print("Thank you for using PentestGPT!", style="bold green") else: self.console.print("Please key in the correct options.", style="bold red") response = self.input_handler() return response def main(self): """ The main function of pentestGPT. The design is based on PentestGPT_design.md """ # 0. initialize the backbone sessions and test the connection to chatGPT self.initialize() # 1. User firstly provide basic information of the task init_description = prompt_ask( "Please describe the penetration testing task in one line, including the target IP, task type, etc.\n> ", multiline=False, ) ## Provide the information to the reasoning session for the task initialization. prefixed_init_description = self.prompts.task_description + init_description with self.console.status( "[bold green] Generating Task Information..." ) as status: _response = self.reasoning_handler(prefixed_init_description) self.console.print("- Task information generated. \n", style="bold green") # 2. Reasoning session generates the first thing to do and provide the information to the generation session with self.console.status("[bold green]Processing...") as status: first_generation_response = self.test_generation_handler( self.prompts.todo_to_command + self.prompts.first_todo ) # 3. Show user the first thing to do. self.console.print( "PentestGPT suggests you to do the following: ", style="bold green" ) self.console.print(_response) self.console.print("You may start with:", style="bold green") self.console.print(first_generation_response) # 4. enter the main loop. while True: result = self.input_handler() self.console.print( "-----------------------------------------", style="bold white" ) if not result: # end the session break # Summarize the session and end # TODO. # clear the sessions # TODO.