PentestGPT/utils/pentest_gpt.py

392 lines
17 KiB
Python

# an automated penetration testing parser empowered by GPT
from config.chatgpt_config import ChatGPTConfig
from rich.spinner import Spinner
from utils.chatgpt import ChatGPT
from rich.console import Console
from prompts.prompt_class import PentestGPTPrompt
from utils.prompt_select import prompt_select, prompt_ask
from prompt_toolkit.formatted_text import HTML
from utils.task_handler import main_task_entry, mainTaskCompleter
from utils.web_parser import google_search, parse_web
import time
import datetime as dt
import loguru
import time, os, textwrap, json
logger = loguru.logger
logger.add(sink="logs/pentest_gpt.log")
def prompt_continuation(width, line_number, wrap_count):
"""
The continuation: display line numbers and '->' before soft wraps.
Notice that we can return any kind of formatted text from here.
The prompt continuation doesn't have to be the same width as the prompt
which is displayed before the first line, but in this example we choose to
align them. The `width` input that we receive here represents the width of
the prompt.
"""
if wrap_count > 0:
return " " * (width - 3) + "-> "
else:
text = ("- %i - " % (line_number + 1)).rjust(width)
return HTML("<strong>%s</strong>") % text
class pentestGPT:
postfix_options = {
"tool": "The input content is from a security testing tool. You need to list down all the points that are interesting to you; you should summarize it as if you are reporting to a senior penetration tester for further guidance.\n",
"user-comments": "The input content is from user comments.\n",
"web": "The input content is from web pages. You need to summarize the readable-contents, and list down all the points that can be interesting for penetration testing.\n",
"default": "The user did not specify the input source. You need to summarize based on the contents.\n",
}
def __init__(self):
self.log_dir = "logs"
self.chatGPTAgent = ChatGPT(ChatGPTConfig())
self.chatGPT4Agent = ChatGPT(ChatGPTConfig(model="gpt-4"))
self.prompts = PentestGPTPrompt
self.console = Console()
self.spinner = Spinner("line", "Processing")
self.test_generation_session_id = None
self.test_reasoning_session_id = None
self.input_parsing_session_id = None
self.chat_count = 0
self.step_reasoning = (
None # the response from the reasoning session for the current step
)
self.history = {
"user": [],
"pentestGPT": [],
"reasoning": [],
"input_parsing": [],
"generation": [],
"exception": [],
} # the history of the current conversation
def log_conversation(self, source, text):
"""
append the conversation into the history
Parameters:
----------
source: str
the source of the conversation
text: str
the content of the conversation
"""
# append the conversation into the history
timestamp = time.time()
if source not in self.history.keys():
# an exception
source = "exception"
self.history[source].append((timestamp, text))
def initialize(self):
# initialize the backbone sessions and test the connection to chatGPT
# define three sessions: testGenerationSession, testReasoningSession, and InputParsingSession
with self.console.status(
"[bold green] Initialize ChatGPT Sessions..."
) as status:
try:
(
text_0,
self.test_generation_session_id,
) = self.chatGPTAgent.send_new_message(
self.prompts.generation_session_init,
)
(
text_1,
self.test_reasoning_session_id,
) = self.chatGPT4Agent.send_new_message(
self.prompts.reasoning_session_init
)
(
text_2,
self.input_parsing_session_id,
) = self.chatGPTAgent.send_new_message(self.prompts.input_parsing_init)
except Exception as e:
logger.error(e)
self.console.print("- ChatGPT Sessions Initialized.", style="bold green")
def reasoning_handler(self, text) -> str:
# summarize the contents if necessary.
if len(text) > 8000:
text = self.input_parsing_handler(text)
# pass the information to reasoning_handler and obtain the results
response = self.chatGPT4Agent.send_message(
self.prompts.process_results + text, self.test_reasoning_session_id
)
# log the conversation
self.log_conversation("reasoning", response)
return response
def input_parsing_handler(self, text, source=None) -> str:
prefix = "Please summarize the following input. "
# do some engineering trick here. Add postfix to the input to make it more understandable by LLMs.
if source is not None and source in self.postfix_options.keys():
prefix = prefix + self.postfix_options[source]
# The default token-size limit is 4096 (web UI even shorter). 1 token ~= 4 chars in English
# Use textwrap to split inputs. Limit to 2000 token (8000 chars) for each input
# (1) replace all the newlines with spaces
text = text.replace("\r", " ").replace("\n", " ")
# (2) wrap the text
wrapped_text = textwrap.fill(text, 8000)
wrapped_inputs = wrapped_text.split("\n")
# (3) send the inputs to chatGPT input_parsing_session and obtain the results
summarized_content = ""
for wrapped_input in wrapped_inputs:
word_limit = f"Please ensure that the input is less than {8000 / len(wrapped_inputs)} words.\n"
summarized_content += self.chatGPTAgent.send_message(
prefix + word_limit + wrapped_input, self.input_parsing_session_id
)
# log the conversation
self.log_conversation("input_parsing", summarized_content)
return summarized_content
def test_generation_handler(self, text):
# send the contents to chatGPT test_generation_session and obtain the results
response = self.chatGPTAgent.send_message(text, self.test_generation_session_id)
# log the conversation
self.log_conversation("generation", response)
return response
def input_handler(self) -> str:
"""
Request for user's input to: (1) input test results, (2) ask for todos, (3) input other information, (4) end.
The design details are based on PentestGPT_design.md
Return
-----
response: str
The response from the chatGPT model.
"""
self.chat_count += 1
request_option = main_task_entry()
self.log_conversation("user", request_option)
# request_option = prompt_select(
# title=f"({self.chat_count}) > Please select your options with cursor: ",
# values=[
# ("1", HTML('<style fg="cyan">Input test results</style>')),
# ("2", HTML('<style fg="cyan">Ask for todos</style>')),
# ("3", HTML('<style fg="cyan">Discuss with PentestGPT</style>')),
# ("4", HTML('<style fg="cyan">Exit</style>')),
# ],
# )
# pass output
if request_option == "help":
print(mainTaskCompleter().task_details)
if request_option == "next":
## (1) pass the information to input_parsing session.
## Give a option list for user to choose from
options = list(self.postfix_options.keys())
value_list = [
(i, HTML(f'<style fg="cyan">{options[i]}</style>'))
for i in range(len(options))
]
source = prompt_select(
title="Please choose the source of the information.", values=value_list
)
self.console.print(
"Your input: (End with <shift + right-arrow>)", style="bold green"
)
user_input = prompt_ask("> ", multiline=True)
self.log_conversation(
"user_input", "Source: " + options[int(source)] + "\n" + user_input
)
with self.console.status("[bold green] PentestGPT Thinking...") as status:
parsed_input = self.input_parsing_handler(
user_input, source=options[int(source)]
)
## (2) pass the summarized information to the reasoning session.
reasoning_response = self.reasoning_handler(parsed_input)
self.step_reasoning_response = reasoning_response
## (3) print the results
self.console.print(
"Based on the analysis, the following tasks are recommended:",
style="bold green",
)
self.console.print(reasoning_response + "\n")
self.log_conversation(
"pentestGPT",
"Based on the analysis, the following tasks are recommended:"
+ reasoning_response,
)
response = reasoning_response
# generate more test details (beginner mode)
elif request_option == "more":
## (1) pass the reasoning results to the test_generation session.
if self.step_reasoning_response is None:
self.console.print(
"You have not initialized the task yet. Please perform the basic testing following `next` option.",
style="bold red",
)
response = "You have not initialized the task yet. Please perform the basic testing following `next` option."
self.log_conversation("pentestGPT", response)
return response
with self.console.status("[bold green] PentestGPT Thinking...") as status:
generation_response = self.test_generation_handler(
self.step_reasoning_response
)
self.console.print(
"Below are the further details.",
style="bold green",
)
self.console.print(generation_response + "\n")
response = generation_response
self.log_conversation("pentestGPT", response)
# ask for task list (to-do list)
elif request_option == "todo":
## (1) ask the reasoning session to analyze the current situation, and list the top sub-tasks
with self.console.status("[bold green] PentestGPT Thinking...") as status:
reasoning_response = self.reasoning_handler(self.prompts.ask_todo)
## (2) pass the sub-tasks to the test_generation session.
message = self.prompts.todo_to_command + "\n" + reasoning_response
generation_response = self.test_generation_handler(message)
## (3) print the results
self.console.print(
"Based on the analysis, the following tasks are recommended:",
style="bold green",
)
self.console.print(reasoning_response + "\n")
self.console.print(
"You can follow the instructions below to complete the tasks.",
style="bold green",
)
self.console.print(generation_response + "\n")
response = reasoning_response
self.log_conversation(
"pentestGPT",
"Based on the analysis, the following tasks are recommended:"
+ reasoning_response
+ "\n"
+ "You can follow the instructions below to complete the tasks."
+ generation_response,
)
# pass other information, such as questions or some observations.
elif request_option == "discuss":
## (1) Request for user multi-line input
self.console.print("Please share your thoughts/questions with PentestGPT.")
self.log_conversation(
"pentestGPT", "Please share your thoughts/questions with PentestGPT."
)
user_input = prompt_ask(
"(End with <shift + right-arrow>) Your input: ", multiline=True
)
self.log_conversation("user_input", user_input)
## (2) pass the information to the reasoning session.
with self.console.status("[bold green] PentestGPT Thinking...") as status:
response = self.reasoning_handler(self.prompts.discussion + user_input)
## (3) print the results
self.console.print("PentestGPT:\n", style="bold green")
self.console.print(response + "\n", style="yellow")
self.log_conversation("pentestGPT", response)
# Google
elif request_option == "google":
# get the users input
self.console.print(
"Please enter your search query. PentestGPT will summarize the info from google.",
style="bold green",
)
self.log_conversation(
"pentestGPT",
"Please enter your search query. PentestGPT will summarize the info from google.",
)
user_input = prompt_ask(
"(End with <shift + right-arrow>) Your input: ", multiline=False
)
self.log_conversation("user_input", user_input)
with self.console.status("[bold green] PentestGPT Thinking...") as status:
# query the question
result: dict = google_search(user_input, 5) # 5 results by default
# summarize the results
# TODO
response = "Google search results:\n" + "still under development."
self.console.print(response + "\n", style="yellow")
self.log_conversation("pentestGPT", response)
return response
# end
elif request_option == "quit":
response = False
self.console.print("Thank you for using PentestGPT!", style="bold green")
self.log_conversation("pentestGPT", "Thank you for using PentestGPT!")
else:
self.console.print("Please key in the correct options.", style="bold red")
self.log_conversation("pentestGPT", "Please key in the correct options.")
response = "Please key in the correct options."
return response
def main(self):
"""
The main function of pentestGPT. The design is based on PentestGPT_design.md
"""
# 0. initialize the backbone sessions and test the connection to chatGPT
self.initialize()
# 1. User firstly provide basic information of the task
init_description = prompt_ask(
"Please describe the penetration testing task in one line, including the target IP, task type, etc.\n> ",
multiline=False,
)
self.log_conversation("user", init_description)
## Provide the information to the reasoning session for the task initialization.
prefixed_init_description = self.prompts.task_description + init_description
with self.console.status(
"[bold green] Generating Task Information..."
) as status:
_response = self.reasoning_handler(prefixed_init_description)
self.console.print("- Task information generated. \n", style="bold green")
# 2. Reasoning session generates the first thing to do and provide the information to the generation session
with self.console.status("[bold green]Processing...") as status:
first_generation_response = self.test_generation_handler(
self.prompts.todo_to_command + self.prompts.first_todo
)
# 3. Show user the first thing to do.
self.console.print(
"PentestGPT suggests you to do the following: ", style="bold green"
)
self.console.print(_response)
self.log_conversation(
"PentestGPT", "PentestGPT suggests you to do the following: \n" + _response
)
self.console.print("You may start with:", style="bold green")
self.console.print(first_generation_response)
self.log_conversation(
"PentestGPT", "You may start with: \n" + first_generation_response
)
# 4. enter the main loop.
while True:
result = self.input_handler()
self.console.print(
"-----------------------------------------", style="bold white"
)
if not result: # end the session
break
# Summarize the session and end
# TODO.
# log the session.
## save self.history into a txt file based on timestamp
timestamp = time.time()
log_name = "pentestGPT_log_" + str(timestamp) + ".txt"
# save it in the logs folder
log_path = os.path.join(self.log_dir, log_name)
with open(log_path, "w") as f:
json.dump(self.history, f)
# clear the sessions
# TODO.