mirror of
https://github.com/weyne85/PentestGPT.git
synced 2025-10-29 16:58:59 +00:00
517 lines
22 KiB
Python
517 lines
22 KiB
Python
# an automated penetration testing parser empowered by GPT
|
|
from config.chatgpt_config import ChatGPTConfig
|
|
from rich.spinner import Spinner
|
|
from utils.chatgpt import ChatGPT
|
|
from utils.chatgpt_api import ChatGPTAPI
|
|
from rich.console import Console
|
|
from prompts.prompt_class import PentestGPTPrompt
|
|
from utils.prompt_select import prompt_select, prompt_ask
|
|
from prompt_toolkit.formatted_text import HTML
|
|
from utils.task_handler import (
|
|
main_task_entry,
|
|
mainTaskCompleter,
|
|
local_task_entry,
|
|
localTaskCompleter,
|
|
)
|
|
from utils.web_parser import google_search, parse_web
|
|
import time
|
|
import datetime as dt
|
|
|
|
import loguru
|
|
import time, os, textwrap, json
|
|
|
|
logger = loguru.logger
|
|
logger.add(sink="logs/pentest_gpt.log")
|
|
|
|
|
|
def prompt_continuation(width, line_number, wrap_count):
|
|
"""
|
|
The continuation: display line numbers and '->' before soft wraps.
|
|
Notice that we can return any kind of formatted text from here.
|
|
The prompt continuation doesn't have to be the same width as the prompt
|
|
which is displayed before the first line, but in this example we choose to
|
|
align them. The `width` input that we receive here represents the width of
|
|
the prompt.
|
|
"""
|
|
if wrap_count > 0:
|
|
return " " * (width - 3) + "-> "
|
|
else:
|
|
text = ("- %i - " % (line_number + 1)).rjust(width)
|
|
return HTML("<strong>%s</strong>") % text
|
|
|
|
|
|
class pentestGPT:
|
|
postfix_options = {
|
|
"tool": "The input content is from a security testing tool. You need to list down all the points that are interesting to you; you should summarize it as if you are reporting to a senior penetration tester for further guidance.\n",
|
|
"user-comments": "The input content is from user comments.\n",
|
|
"web": "The input content is from web pages. You need to summarize the readable-contents, and list down all the points that can be interesting for penetration testing.\n",
|
|
"default": "The user did not specify the input source. You need to summarize based on the contents.\n",
|
|
}
|
|
|
|
def __init__(self, reasoning_model="gpt-4", useAPI=False):
|
|
self.log_dir = "logs"
|
|
if useAPI is False:
|
|
self.chatGPTAgent = ChatGPT(ChatGPTConfig())
|
|
self.chatGPT4Agent = ChatGPT(ChatGPTConfig(model=reasoning_model))
|
|
else:
|
|
self.chatGPTAgent = ChatGPTAPI(ChatGPTConfig())
|
|
self.chatGPT4Agent = ChatGPTAPI(ChatGPTConfig(model=reasoning_model))
|
|
self.prompts = PentestGPTPrompt
|
|
self.console = Console()
|
|
self.spinner = Spinner("line", "Processing")
|
|
self.test_generation_session_id = None
|
|
self.test_reasoning_session_id = None
|
|
self.input_parsing_session_id = None
|
|
self.chat_count = 0
|
|
self.step_reasoning = (
|
|
None # the response from the reasoning session for the current step
|
|
)
|
|
self.history = {
|
|
"user": [],
|
|
"pentestGPT": [],
|
|
"reasoning": [],
|
|
"input_parsing": [],
|
|
"generation": [],
|
|
"exception": [],
|
|
} # the history of the current conversation
|
|
|
|
def log_conversation(self, source, text):
|
|
"""
|
|
append the conversation into the history
|
|
|
|
Parameters:
|
|
----------
|
|
source: str
|
|
the source of the conversation
|
|
text: str
|
|
the content of the conversation
|
|
"""
|
|
# append the conversation into the history
|
|
timestamp = time.time()
|
|
if source not in self.history.keys():
|
|
# an exception
|
|
source = "exception"
|
|
self.history[source].append((timestamp, text))
|
|
|
|
def initialize(self):
|
|
# initialize the backbone sessions and test the connection to chatGPT
|
|
# define three sessions: testGenerationSession, testReasoningSession, and InputParsingSession
|
|
with self.console.status(
|
|
"[bold green] Initialize ChatGPT Sessions..."
|
|
) as status:
|
|
try:
|
|
(
|
|
text_0,
|
|
self.test_generation_session_id,
|
|
) = self.chatGPTAgent.send_new_message(
|
|
self.prompts.generation_session_init,
|
|
)
|
|
(
|
|
text_1,
|
|
self.test_reasoning_session_id,
|
|
) = self.chatGPT4Agent.send_new_message(
|
|
self.prompts.reasoning_session_init
|
|
)
|
|
(
|
|
text_2,
|
|
self.input_parsing_session_id,
|
|
) = self.chatGPTAgent.send_new_message(self.prompts.input_parsing_init)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
self.console.print("- ChatGPT Sessions Initialized.", style="bold green")
|
|
|
|
def reasoning_handler(self, text) -> str:
|
|
# summarize the contents if necessary.
|
|
if len(text) > 8000:
|
|
text = self.input_parsing_handler(text)
|
|
# pass the information to reasoning_handler and obtain the results
|
|
response = self.chatGPT4Agent.send_message(
|
|
self.prompts.process_results + text, self.test_reasoning_session_id
|
|
)
|
|
# log the conversation
|
|
self.log_conversation("reasoning", response)
|
|
return response
|
|
|
|
def input_parsing_handler(self, text, source=None) -> str:
|
|
prefix = "Please summarize the following input. "
|
|
# do some engineering trick here. Add postfix to the input to make it more understandable by LLMs.
|
|
if source is not None and source in self.postfix_options.keys():
|
|
prefix = prefix + self.postfix_options[source]
|
|
# The default token-size limit is 4096 (web UI even shorter). 1 token ~= 4 chars in English
|
|
# Use textwrap to split inputs. Limit to 2000 token (8000 chars) for each input
|
|
# (1) replace all the newlines with spaces
|
|
text = text.replace("\r", " ").replace("\n", " ")
|
|
# (2) wrap the text
|
|
wrapped_text = textwrap.fill(text, 8000)
|
|
wrapped_inputs = wrapped_text.split("\n")
|
|
# (3) send the inputs to chatGPT input_parsing_session and obtain the results
|
|
summarized_content = ""
|
|
for wrapped_input in wrapped_inputs:
|
|
word_limit = f"Please ensure that the input is less than {8000 / len(wrapped_inputs)} words.\n"
|
|
summarized_content += self.chatGPTAgent.send_message(
|
|
prefix + word_limit + wrapped_input, self.input_parsing_session_id
|
|
)
|
|
# log the conversation
|
|
self.log_conversation("input_parsing", summarized_content)
|
|
return summarized_content
|
|
|
|
def test_generation_handler(self, text):
|
|
# send the contents to chatGPT test_generation_session and obtain the results
|
|
response = self.chatGPTAgent.send_message(text, self.test_generation_session_id)
|
|
# log the conversation
|
|
self.log_conversation("generation", response)
|
|
return response
|
|
|
|
def local_input_handler(self) -> str:
|
|
"""
|
|
Request for user's input to handle the local task
|
|
"""
|
|
local_task_response = ""
|
|
self.chat_count += 1
|
|
local_request_option = local_task_entry()
|
|
self.log_conversation("user", local_request_option)
|
|
|
|
if local_request_option == "help":
|
|
print(localTaskCompleter().task_details)
|
|
|
|
elif local_request_option == "discuss":
|
|
## (1) Request for user multi-line input
|
|
self.console.print(
|
|
"Please share your findings and questions with PentestGPT."
|
|
)
|
|
self.log_conversation(
|
|
"pentestGPT",
|
|
"Please share your findings and questions with PentestGPT. (End with <shift + right-arrow>)",
|
|
)
|
|
user_input = prompt_ask("Your input: ", multiline=True)
|
|
self.log_conversation("user", user_input)
|
|
## (2) pass the information to the reasoning session.
|
|
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
|
local_task_response = self.test_generation_handler(
|
|
self.prompts.local_task_prefix + user_input
|
|
)
|
|
## (3) print the results
|
|
self.console.print("PentestGPT:\n", style="bold green")
|
|
self.console.print(local_task_response + "\n", style="yellow")
|
|
self.log_conversation("pentestGPT", local_task_response)
|
|
|
|
elif local_request_option == "brainstorm":
|
|
## (1) Request for user multi-line input
|
|
self.console.print(
|
|
"Please share your concerns and questions with PentestGPT."
|
|
)
|
|
self.log_conversation(
|
|
"pentestGPT",
|
|
"Please share your concerns and questions with PentestGPT. End with <shift + right-arrow>)",
|
|
)
|
|
user_input = prompt_ask("Your input: ", multiline=True)
|
|
self.log_conversation("user", user_input)
|
|
## (2) pass the information to the reasoning session.
|
|
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
|
local_task_response = self.test_generation_handler(
|
|
self.prompts.local_task_brainstorm + user_input
|
|
)
|
|
## (3) print the results
|
|
self.console.print("PentestGPT:\n", style="bold green")
|
|
self.console.print(local_task_response + "\n", style="yellow")
|
|
self.log_conversation("pentestGPT", local_task_response)
|
|
|
|
elif local_request_option == "google":
|
|
# get the users input
|
|
self.console.print(
|
|
"Please enter your search query. PentestGPT will summarize the info from google. (End with <shift + right-arrow>) ",
|
|
style="bold green",
|
|
)
|
|
self.log_conversation(
|
|
"pentestGPT",
|
|
"Please enter your search query. PentestGPT will summarize the info from google.",
|
|
)
|
|
user_input = prompt_ask("Your input: ", multiline=False)
|
|
self.log_conversation("user", user_input)
|
|
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
|
# query the question
|
|
result: dict = google_search(user_input, 5) # 5 results by default
|
|
# summarize the results
|
|
# TODO
|
|
local_task_response = (
|
|
"Google search results:\n" + "still under development."
|
|
)
|
|
self.console.print(local_task_response + "\n", style="yellow")
|
|
self.log_conversation("pentestGPT", local_task_response)
|
|
return local_task_response
|
|
|
|
elif local_request_option == "continue":
|
|
self.console.print("Exit the local task and continue the main task.")
|
|
self.log_conversation(
|
|
"pentestGPT", "Exit the local task and continue the main task."
|
|
)
|
|
local_task_response = "continue"
|
|
|
|
return local_task_response
|
|
|
|
def input_handler(self) -> str:
|
|
"""
|
|
Request for user's input to:
|
|
(1) input test results,
|
|
(2) ask for todos,
|
|
(3) input other information (discuss),
|
|
(4) google.
|
|
(4) end.
|
|
The design details are based on PentestGPT_design.md
|
|
|
|
Return
|
|
-----
|
|
response: str
|
|
The response from the chatGPT model.
|
|
"""
|
|
self.chat_count += 1
|
|
|
|
request_option = main_task_entry()
|
|
self.log_conversation("user", request_option)
|
|
|
|
if request_option == "help":
|
|
print(mainTaskCompleter().task_details)
|
|
|
|
if request_option == "next":
|
|
## (1) pass the information to input_parsing session.
|
|
## Give a option list for user to choose from
|
|
options = list(self.postfix_options.keys())
|
|
value_list = [
|
|
(i, HTML(f'<style fg="cyan">{options[i]}</style>'))
|
|
for i in range(len(options))
|
|
]
|
|
source = prompt_select(
|
|
title="Please choose the source of the information.", values=value_list
|
|
)
|
|
self.console.print(
|
|
"Your input: (End with <shift + right-arrow>)", style="bold green"
|
|
)
|
|
user_input = prompt_ask("> ", multiline=True)
|
|
self.log_conversation(
|
|
"user", "Source: " + options[int(source)] + "\n" + user_input
|
|
)
|
|
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
|
parsed_input = self.input_parsing_handler(
|
|
user_input, source=options[int(source)]
|
|
)
|
|
## (2) pass the summarized information to the reasoning session.
|
|
reasoning_response = self.reasoning_handler(parsed_input)
|
|
self.step_reasoning_response = reasoning_response
|
|
|
|
## (3) print the results
|
|
self.console.print(
|
|
"Based on the analysis, the following tasks are recommended:",
|
|
style="bold green",
|
|
)
|
|
self.console.print(reasoning_response + "\n")
|
|
self.log_conversation(
|
|
"pentestGPT",
|
|
"Based on the analysis, the following tasks are recommended:"
|
|
+ reasoning_response,
|
|
)
|
|
response = reasoning_response
|
|
|
|
# generate more test details (beginner mode)
|
|
elif request_option == "more":
|
|
self.log_conversation("user", "more")
|
|
## (1) check if reasoning session is initialized
|
|
if self.step_reasoning_response is None:
|
|
self.console.print(
|
|
"You have not initialized the task yet. Please perform the basic testing following `next` option.",
|
|
style="bold red",
|
|
)
|
|
response = "You have not initialized the task yet. Please perform the basic testing following `next` option."
|
|
self.log_conversation("pentestGPT", response)
|
|
return response
|
|
## (2) start local task generation.
|
|
### (2.1) ask the reasoning session to analyze the current situation, and explain the task
|
|
self.console.print(
|
|
"PentestGPT will generate more test details, and enter the sub-task generation mode. (Pressing Enter to continue)",
|
|
style="bold green",
|
|
)
|
|
self.log_conversation(
|
|
"pentestGPT",
|
|
"PentestGPT will generate more test details, and enter the sub-task generation mode.",
|
|
)
|
|
input()
|
|
|
|
### (2.2) pass the sub-tasks to the test generation session
|
|
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
|
generation_response = self.test_generation_handler(
|
|
self.step_reasoning_response
|
|
)
|
|
_local_init_response = self.test_generation_handler(
|
|
self.prompts.local_task_init
|
|
)
|
|
|
|
self.console.print(
|
|
"Below are the further details.",
|
|
style="bold green",
|
|
)
|
|
self.console.print(generation_response + "\n")
|
|
response = generation_response
|
|
self.log_conversation("pentestGPT", response)
|
|
|
|
### (2.3) local task handler
|
|
|
|
while True:
|
|
local_task_response = self.local_input_handler()
|
|
if local_task_response == "continue":
|
|
# break the local task handler
|
|
break
|
|
|
|
# ask for task list (to-do list)
|
|
elif request_option == "todo":
|
|
## log that user is asking for todo list
|
|
self.log_conversation("user", "todo")
|
|
## (1) ask the reasoning session to analyze the current situation, and list the top sub-tasks
|
|
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
|
reasoning_response = self.reasoning_handler(self.prompts.ask_todo)
|
|
## (2) pass the sub-tasks to the test_generation session.
|
|
message = self.prompts.todo_to_command + "\n" + reasoning_response
|
|
generation_response = self.test_generation_handler(message)
|
|
## (3) print the results
|
|
self.console.print(
|
|
"Based on the analysis, the following tasks are recommended:",
|
|
style="bold green",
|
|
)
|
|
self.console.print(reasoning_response + "\n")
|
|
self.console.print(
|
|
"You can follow the instructions below to complete the tasks.",
|
|
style="bold green",
|
|
)
|
|
self.console.print(generation_response + "\n")
|
|
response = reasoning_response
|
|
self.log_conversation(
|
|
"pentestGPT",
|
|
"Based on the analysis, the following tasks are recommended:"
|
|
+ reasoning_response
|
|
+ "\n"
|
|
+ "You can follow the instructions below to complete the tasks."
|
|
+ generation_response,
|
|
)
|
|
|
|
# pass other information, such as questions or some observations.
|
|
elif request_option == "discuss":
|
|
## (1) Request for user multi-line input
|
|
self.console.print(
|
|
"Please share your thoughts/questions with PentestGPT. (End with <shift + right-arrow>) "
|
|
)
|
|
self.log_conversation(
|
|
"pentestGPT", "Please share your thoughts/questions with PentestGPT."
|
|
)
|
|
user_input = prompt_ask("Your input: ", multiline=True)
|
|
self.log_conversation("user", user_input)
|
|
## (2) pass the information to the reasoning session.
|
|
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
|
response = self.reasoning_handler(self.prompts.discussion + user_input)
|
|
## (3) print the results
|
|
self.console.print("PentestGPT:\n", style="bold green")
|
|
self.console.print(response + "\n", style="yellow")
|
|
self.log_conversation("pentestGPT", response)
|
|
|
|
# Google
|
|
elif request_option == "google":
|
|
# get the users input
|
|
self.console.print(
|
|
"Please enter your search query. PentestGPT will summarize the info from google. (End with <shift + right-arrow>) ",
|
|
style="bold green",
|
|
)
|
|
self.log_conversation(
|
|
"pentestGPT",
|
|
"Please enter your search query. PentestGPT will summarize the info from google.",
|
|
)
|
|
user_input = prompt_ask("Your input: ", multiline=False)
|
|
self.log_conversation("user", user_input)
|
|
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
|
# query the question
|
|
result: dict = google_search(user_input, 5) # 5 results by default
|
|
# summarize the results
|
|
# TODO
|
|
response = "Google search results:\n" + "still under development."
|
|
self.console.print(response + "\n", style="yellow")
|
|
self.log_conversation("pentestGPT", response)
|
|
return response
|
|
|
|
# end
|
|
elif request_option == "quit":
|
|
response = False
|
|
self.console.print("Thank you for using PentestGPT!", style="bold green")
|
|
self.log_conversation("pentestGPT", "Thank you for using PentestGPT!")
|
|
|
|
else:
|
|
self.console.print("Please key in the correct options.", style="bold red")
|
|
self.log_conversation("pentestGPT", "Please key in the correct options.")
|
|
response = "Please key in the correct options."
|
|
return response
|
|
|
|
def main(self):
|
|
"""
|
|
The main function of pentestGPT. The design is based on PentestGPT_design.md
|
|
"""
|
|
# 0. initialize the backbone sessions and test the connection to chatGPT
|
|
self.initialize()
|
|
|
|
# 1. User firstly provide basic information of the task
|
|
init_description = prompt_ask(
|
|
"Please describe the penetration testing task in one line, including the target IP, task type, etc.\n> ",
|
|
multiline=False,
|
|
)
|
|
self.log_conversation("user", init_description)
|
|
## Provide the information to the reasoning session for the task initialization.
|
|
prefixed_init_description = self.prompts.task_description + init_description
|
|
with self.console.status(
|
|
"[bold green] Generating Task Information..."
|
|
) as status:
|
|
_response = self.reasoning_handler(prefixed_init_description)
|
|
self.console.print("- Task information generated. \n", style="bold green")
|
|
# 2. Reasoning session generates the first thing to do and provide the information to the generation session
|
|
with self.console.status("[bold green]Processing...") as status:
|
|
first_generation_response = self.test_generation_handler(
|
|
self.prompts.todo_to_command + self.prompts.first_todo
|
|
)
|
|
# 3. Show user the first thing to do.
|
|
self.console.print(
|
|
"PentestGPT suggests you to do the following: ", style="bold green"
|
|
)
|
|
self.console.print(_response)
|
|
self.log_conversation(
|
|
"PentestGPT", "PentestGPT suggests you to do the following: \n" + _response
|
|
)
|
|
self.console.print("You may start with:", style="bold green")
|
|
self.console.print(first_generation_response)
|
|
self.log_conversation(
|
|
"PentestGPT", "You may start with: \n" + first_generation_response
|
|
)
|
|
|
|
# 4. enter the main loop.
|
|
while True:
|
|
try:
|
|
result = self.input_handler()
|
|
self.console.print(
|
|
"-----------------------------------------", style="bold white"
|
|
)
|
|
if not result: # end the session
|
|
break
|
|
except Exception as e: # catch all general exception.
|
|
# log the exception
|
|
self.log_conversation("exception", str(e))
|
|
# print the exception
|
|
self.console.print("Exception: " + str(e), style="bold red")
|
|
# safely quit the session
|
|
break
|
|
|
|
# Summarize the session and end
|
|
# TODO.
|
|
# log the session.
|
|
## save self.history into a txt file based on timestamp
|
|
timestamp = time.time()
|
|
log_name = "pentestGPT_log_" + str(timestamp) + ".txt"
|
|
# save it in the logs folder
|
|
log_path = os.path.join(self.log_dir, log_name)
|
|
with open(log_path, "w") as f:
|
|
json.dump(self.history, f)
|
|
|
|
# clear the sessions
|
|
# TODO.
|