diff --git a/README.md b/README.md index 2a6b885..ec54e31 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # PentestGPT +**We're testing PentestGPT on HackTheBox**. Follow + ## Introduction **PentestGPT** is a penetration testing tool empowered by **ChatGPT**. It is designed to automate the penetration testing process. It is built on top of ChatGPT and operate in an interactive mode to guide penetration testers in both overall progress and specific operations. A sample testing process of **PentestGPT** on a target VulnHub machine (Hackable II) is available at [here](./resources/PentestGPT_Hackable2.pdf). diff --git a/requirements.txt b/requirements.txt index 937b561..f850b25 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ colorama rich prompt-toolkit google -pytest \ No newline at end of file +pytest +openai \ No newline at end of file diff --git a/utils/chatgpt.py b/utils/chatgpt.py index bbc8130..4472390 100644 --- a/utils/chatgpt.py +++ b/utils/chatgpt.py @@ -68,9 +68,9 @@ class ChatGPT: self.conversation_dict: Dict[str, Conversation] = {} self.headers = dict( { - "cookie": f"cf_clearance={self.cf_clearance}; _puid={self._puid}; __Secure-next-auth.session-token={self.session_token}", - # "cookie": self.config.cookie, - "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36", + # "cookie": f"cf_clearance={self.cf_clearance}; _puid={self._puid}; __Secure-next-auth.session-token={self.session_token}", + "cookie": self.config.cookie, + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36", "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7" # 'Content-Type': 'text/event-stream; charset=utf-8', } @@ -79,6 +79,8 @@ class ChatGPT: def get_authorization(self): url = "https://chat.openai.com/api/auth/session" + if "cookie" in vars(self.config): + self.headers["cookie"] = self.config.cookie r = requests.get(url, headers=self.headers) authorization = r.json()["accessToken"] # authorization = self.config.accessToken diff --git a/utils/chatgpt_browser.py b/utils/chatgpt_browser.py index c796f85..955409a 100644 --- a/utils/chatgpt_browser.py +++ b/utils/chatgpt_browser.py @@ -1,3 +1,4 @@ +# This file is deprecated. It is not used in the project. # -*- coding: utf-8 -*- import json @@ -5,12 +6,13 @@ import re import time from uuid import uuid1 import datetime +from chatgpt_wrapper import OpenAIAPI import loguru import requests from chatgpt_wrapper import ChatGPT -from chatgpt_wrapper.config import Config +from config.chatgpt_config import ChatGPTConfig logger = loguru.logger @@ -22,7 +24,7 @@ class ChatGPTBrowser: """ def __init__(self, model=None): - config = Config() + config = ChatGPTConfig() if model is not None: config.set("chat.model", model) self.bot = ChatGPT(config) @@ -48,6 +50,7 @@ class ChatGPTBrowser: def send_message(self, message, conversation_id): # 发送会话窗口消息 # TODO: send message from browser + # check here: https://github.com/mmabrouk/chatgpt-wrapper/blob/bafd0be7fb3355ea4a4b0276ade9f0fc6e8571fd/chatgpt_wrapper/backends/openai/repl.py#L101 return def extract_code_fragments(self, text): @@ -61,7 +64,14 @@ class ChatGPTBrowser: if __name__ == "__main__": - chatgptBrowser_session = ChatGPTBrowser() - text, conversation_id = chatgptBrowser_session.send_new_message( - "I am a new tester for RESTful APIs." - ) + # chatgptBrowser_session = ChatGPTBrowser() + # text, conversation_id = chatgptBrowser_session.send_new_message( + # "I am a new tester for RESTful APIs." + # ) + + bot = OpenAIAPI() + success, response, message = bot.ask("Hello, world!") + if success: + print(response) + else: + raise RuntimeError(message) diff --git a/utils/pentest_gpt.py b/utils/pentest_gpt.py index a53e03a..20b46f4 100644 --- a/utils/pentest_gpt.py +++ b/utils/pentest_gpt.py @@ -8,9 +8,11 @@ from utils.prompt_select import prompt_select, prompt_ask from prompt_toolkit.formatted_text import HTML from utils.task_handler import main_task_entry, mainTaskCompleter from utils.web_parser import google_search, parse_web +import time +import datetime as dt import loguru -import time, os, textwrap +import time, os, textwrap, json logger = loguru.logger logger.add(sink="logs/pentest_gpt.log") @@ -53,6 +55,32 @@ class pentestGPT: self.step_reasoning = ( None # the response from the reasoning session for the current step ) + self.history = { + "user": [], + "pentestGPT": [], + "reasoning": [], + "input_parsing": [], + "generation": [], + "exception": [], + } # the history of the current conversation + + def log_conversation(self, source, text): + """ + append the conversation into the history + + Parameters: + ---------- + source: str + the source of the conversation + text: str + the content of the conversation + """ + # append the conversation into the history + timestamp = time.time() + if source not in self.history.keys(): + # an exception + source = "exception" + self.history[source].append((timestamp, text)) def initialize(self): # initialize the backbone sessions and test the connection to chatGPT @@ -89,6 +117,8 @@ class pentestGPT: response = self.chatGPT4Agent.send_message( self.prompts.process_results + text, self.test_reasoning_session_id ) + # log the conversation + self.log_conversation("reasoning", response) return response def input_parsing_handler(self, text, source=None) -> str: @@ -110,12 +140,15 @@ class pentestGPT: summarized_content += self.chatGPTAgent.send_message( prefix + word_limit + wrapped_input, self.input_parsing_session_id ) + # log the conversation + self.log_conversation("input_parsing", summarized_content) return summarized_content def test_generation_handler(self, text): # send the contents to chatGPT test_generation_session and obtain the results response = self.chatGPTAgent.send_message(text, self.test_generation_session_id) - # print the results + # log the conversation + self.log_conversation("generation", response) return response def input_handler(self) -> str: @@ -131,6 +164,7 @@ class pentestGPT: self.chat_count += 1 request_option = main_task_entry() + self.log_conversation("user", request_option) # request_option = prompt_select( # title=f"({self.chat_count}) > Please select your options with cursor: ", # values=[ @@ -141,6 +175,7 @@ class pentestGPT: # ], # ) # pass output + if request_option == "help": print(mainTaskCompleter().task_details) @@ -159,6 +194,9 @@ class pentestGPT: "Your input: (End with )", style="bold green" ) user_input = prompt_ask("> ", multiline=True) + self.log_conversation( + "user_input", "Source: " + options[int(source)] + "\n" + user_input + ) with self.console.status("[bold green] PentestGPT Thinking...") as status: parsed_input = self.input_parsing_handler( user_input, source=options[int(source)] @@ -173,6 +211,11 @@ class pentestGPT: style="bold green", ) self.console.print(reasoning_response + "\n") + self.log_conversation( + "pentestGPT", + "Based on the analysis, the following tasks are recommended:" + + reasoning_response, + ) response = reasoning_response # generate more test details (beginner mode) @@ -183,7 +226,9 @@ class pentestGPT: "You have not initialized the task yet. Please perform the basic testing following `next` option.", style="bold red", ) - return + response = "You have not initialized the task yet. Please perform the basic testing following `next` option." + self.log_conversation("pentestGPT", response) + return response with self.console.status("[bold green] PentestGPT Thinking...") as status: generation_response = self.test_generation_handler( self.step_reasoning_response @@ -195,6 +240,7 @@ class pentestGPT: ) self.console.print(generation_response + "\n") response = generation_response + self.log_conversation("pentestGPT", response) # ask for task list (to-do list) elif request_option == "todo": @@ -216,20 +262,33 @@ class pentestGPT: ) self.console.print(generation_response + "\n") response = reasoning_response + self.log_conversation( + "pentestGPT", + "Based on the analysis, the following tasks are recommended:" + + reasoning_response + + "\n" + + "You can follow the instructions below to complete the tasks." + + generation_response, + ) # pass other information, such as questions or some observations. elif request_option == "discuss": ## (1) Request for user multi-line input self.console.print("Please share your thoughts/questions with PentestGPT.") + self.log_conversation( + "pentestGPT", "Please share your thoughts/questions with PentestGPT." + ) user_input = prompt_ask( "(End with ) Your input: ", multiline=True ) + self.log_conversation("user_input", user_input) ## (2) pass the information to the reasoning session. with self.console.status("[bold green] PentestGPT Thinking...") as status: response = self.reasoning_handler(self.prompts.discussion + user_input) ## (3) print the results self.console.print("PentestGPT:\n", style="bold green") self.console.print(response + "\n", style="yellow") + self.log_conversation("pentestGPT", response) # Google elif request_option == "google": @@ -238,22 +297,34 @@ class pentestGPT: "Please enter your search query. PentestGPT will summarize the info from google.", style="bold green", ) + self.log_conversation( + "pentestGPT", + "Please enter your search query. PentestGPT will summarize the info from google.", + ) user_input = prompt_ask( "(End with ) Your input: ", multiline=False ) + self.log_conversation("user_input", user_input) with self.console.status("[bold green] PentestGPT Thinking...") as status: # query the question - result = self.google_search(user_input, 5) # 5 results by default + result: dict = google_search(user_input, 5) # 5 results by default + # summarize the results + # TODO + response = "Google search results:\n" + "still under development." + self.console.print(response + "\n", style="yellow") + self.log_conversation("pentestGPT", response) + return response # end elif request_option == "quit": response = False self.console.print("Thank you for using PentestGPT!", style="bold green") + self.log_conversation("pentestGPT", "Thank you for using PentestGPT!") else: self.console.print("Please key in the correct options.", style="bold red") - response = self.input_handler() - + self.log_conversation("pentestGPT", "Please key in the correct options.") + response = "Please key in the correct options." return response def main(self): @@ -268,6 +339,7 @@ class pentestGPT: "Please describe the penetration testing task in one line, including the target IP, task type, etc.\n> ", multiline=False, ) + self.log_conversation("user", init_description) ## Provide the information to the reasoning session for the task initialization. prefixed_init_description = self.prompts.task_description + init_description with self.console.status( @@ -285,8 +357,14 @@ class pentestGPT: "PentestGPT suggests you to do the following: ", style="bold green" ) self.console.print(_response) + self.log_conversation( + "PentestGPT", "PentestGPT suggests you to do the following: \n" + _response + ) self.console.print("You may start with:", style="bold green") self.console.print(first_generation_response) + self.log_conversation( + "PentestGPT", "You may start with: \n" + first_generation_response + ) # 4. enter the main loop. while True: @@ -299,6 +377,11 @@ class pentestGPT: # Summarize the session and end # TODO. + # log the session. + ## save self.history into a txt file based on timestamp + log_name = "pentestGPT_log_" + dt.now().strftime("%Y%m%d_%H%M%S") + ".json" + with open(log_name, "w") as f: + json.dump(self.history, f) # clear the sessions # TODO. diff --git a/utils/web_parser.py b/utils/web_parser.py index 3dd7c9d..5d0ad8e 100644 --- a/utils/web_parser.py +++ b/utils/web_parser.py @@ -124,6 +124,7 @@ def google_search(keyword, num_results=5) -> dict: for url in search(keyword, tld="com", num=num_results, stop=num_results, pause=2): search_result[url] = parse_web(url) result = {"keyword": keyword, "search_result": search_result} + return result if __name__ == "__main__":