Merge pull request #15 from GreyDGL/log

feat: 🎸 Add logging function
This commit is contained in:
Grey_D 2023-04-22 11:57:09 +08:00 committed by GitHub
commit 6310cbb5cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 115 additions and 16 deletions

View File

@ -1,5 +1,7 @@
# PentestGPT
**We're testing PentestGPT on HackTheBox**. Follow
## Introduction
**PentestGPT** is a penetration testing tool empowered by **ChatGPT**. It is designed to automate the penetration testing process. It is built on top of ChatGPT and operate in an interactive mode to guide penetration testers in both overall progress and specific operations.
A sample testing process of **PentestGPT** on a target VulnHub machine (Hackable II) is available at [here](./resources/PentestGPT_Hackable2.pdf).

View File

@ -10,4 +10,5 @@ colorama
rich
prompt-toolkit
google
pytest
pytest
openai

View File

@ -68,9 +68,9 @@ class ChatGPT:
self.conversation_dict: Dict[str, Conversation] = {}
self.headers = dict(
{
"cookie": f"cf_clearance={self.cf_clearance}; _puid={self._puid}; __Secure-next-auth.session-token={self.session_token}",
# "cookie": self.config.cookie,
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36",
# "cookie": f"cf_clearance={self.cf_clearance}; _puid={self._puid}; __Secure-next-auth.session-token={self.session_token}",
"cookie": self.config.cookie,
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"
# 'Content-Type': 'text/event-stream; charset=utf-8',
}
@ -79,6 +79,8 @@ class ChatGPT:
def get_authorization(self):
url = "https://chat.openai.com/api/auth/session"
if "cookie" in vars(self.config):
self.headers["cookie"] = self.config.cookie
r = requests.get(url, headers=self.headers)
authorization = r.json()["accessToken"]
# authorization = self.config.accessToken

View File

@ -1,3 +1,4 @@
# This file is deprecated. It is not used in the project.
# -*- coding: utf-8 -*-
import json
@ -5,12 +6,13 @@ import re
import time
from uuid import uuid1
import datetime
from chatgpt_wrapper import OpenAIAPI
import loguru
import requests
from chatgpt_wrapper import ChatGPT
from chatgpt_wrapper.config import Config
from config.chatgpt_config import ChatGPTConfig
logger = loguru.logger
@ -22,7 +24,7 @@ class ChatGPTBrowser:
"""
def __init__(self, model=None):
config = Config()
config = ChatGPTConfig()
if model is not None:
config.set("chat.model", model)
self.bot = ChatGPT(config)
@ -48,6 +50,7 @@ class ChatGPTBrowser:
def send_message(self, message, conversation_id):
# 发送会话窗口消息
# TODO: send message from browser
# check here: https://github.com/mmabrouk/chatgpt-wrapper/blob/bafd0be7fb3355ea4a4b0276ade9f0fc6e8571fd/chatgpt_wrapper/backends/openai/repl.py#L101
return
def extract_code_fragments(self, text):
@ -61,7 +64,14 @@ class ChatGPTBrowser:
if __name__ == "__main__":
chatgptBrowser_session = ChatGPTBrowser()
text, conversation_id = chatgptBrowser_session.send_new_message(
"I am a new tester for RESTful APIs."
)
# chatgptBrowser_session = ChatGPTBrowser()
# text, conversation_id = chatgptBrowser_session.send_new_message(
# "I am a new tester for RESTful APIs."
# )
bot = OpenAIAPI()
success, response, message = bot.ask("Hello, world!")
if success:
print(response)
else:
raise RuntimeError(message)

View File

@ -8,9 +8,11 @@ from utils.prompt_select import prompt_select, prompt_ask
from prompt_toolkit.formatted_text import HTML
from utils.task_handler import main_task_entry, mainTaskCompleter
from utils.web_parser import google_search, parse_web
import time
import datetime as dt
import loguru
import time, os, textwrap
import time, os, textwrap, json
logger = loguru.logger
logger.add(sink="logs/pentest_gpt.log")
@ -53,6 +55,32 @@ class pentestGPT:
self.step_reasoning = (
None # the response from the reasoning session for the current step
)
self.history = {
"user": [],
"pentestGPT": [],
"reasoning": [],
"input_parsing": [],
"generation": [],
"exception": [],
} # the history of the current conversation
def log_conversation(self, source, text):
"""
append the conversation into the history
Parameters:
----------
source: str
the source of the conversation
text: str
the content of the conversation
"""
# append the conversation into the history
timestamp = time.time()
if source not in self.history.keys():
# an exception
source = "exception"
self.history[source].append((timestamp, text))
def initialize(self):
# initialize the backbone sessions and test the connection to chatGPT
@ -89,6 +117,8 @@ class pentestGPT:
response = self.chatGPT4Agent.send_message(
self.prompts.process_results + text, self.test_reasoning_session_id
)
# log the conversation
self.log_conversation("reasoning", response)
return response
def input_parsing_handler(self, text, source=None) -> str:
@ -110,12 +140,15 @@ class pentestGPT:
summarized_content += self.chatGPTAgent.send_message(
prefix + word_limit + wrapped_input, self.input_parsing_session_id
)
# log the conversation
self.log_conversation("input_parsing", summarized_content)
return summarized_content
def test_generation_handler(self, text):
# send the contents to chatGPT test_generation_session and obtain the results
response = self.chatGPTAgent.send_message(text, self.test_generation_session_id)
# print the results
# log the conversation
self.log_conversation("generation", response)
return response
def input_handler(self) -> str:
@ -131,6 +164,7 @@ class pentestGPT:
self.chat_count += 1
request_option = main_task_entry()
self.log_conversation("user", request_option)
# request_option = prompt_select(
# title=f"({self.chat_count}) > Please select your options with cursor: ",
# values=[
@ -141,6 +175,7 @@ class pentestGPT:
# ],
# )
# pass output
if request_option == "help":
print(mainTaskCompleter().task_details)
@ -159,6 +194,9 @@ class pentestGPT:
"Your input: (End with <shift + right-arrow>)", style="bold green"
)
user_input = prompt_ask("> ", multiline=True)
self.log_conversation(
"user_input", "Source: " + options[int(source)] + "\n" + user_input
)
with self.console.status("[bold green] PentestGPT Thinking...") as status:
parsed_input = self.input_parsing_handler(
user_input, source=options[int(source)]
@ -173,6 +211,11 @@ class pentestGPT:
style="bold green",
)
self.console.print(reasoning_response + "\n")
self.log_conversation(
"pentestGPT",
"Based on the analysis, the following tasks are recommended:"
+ reasoning_response,
)
response = reasoning_response
# generate more test details (beginner mode)
@ -183,7 +226,9 @@ class pentestGPT:
"You have not initialized the task yet. Please perform the basic testing following `next` option.",
style="bold red",
)
return
response = "You have not initialized the task yet. Please perform the basic testing following `next` option."
self.log_conversation("pentestGPT", response)
return response
with self.console.status("[bold green] PentestGPT Thinking...") as status:
generation_response = self.test_generation_handler(
self.step_reasoning_response
@ -195,6 +240,7 @@ class pentestGPT:
)
self.console.print(generation_response + "\n")
response = generation_response
self.log_conversation("pentestGPT", response)
# ask for task list (to-do list)
elif request_option == "todo":
@ -216,20 +262,33 @@ class pentestGPT:
)
self.console.print(generation_response + "\n")
response = reasoning_response
self.log_conversation(
"pentestGPT",
"Based on the analysis, the following tasks are recommended:"
+ reasoning_response
+ "\n"
+ "You can follow the instructions below to complete the tasks."
+ generation_response,
)
# pass other information, such as questions or some observations.
elif request_option == "discuss":
## (1) Request for user multi-line input
self.console.print("Please share your thoughts/questions with PentestGPT.")
self.log_conversation(
"pentestGPT", "Please share your thoughts/questions with PentestGPT."
)
user_input = prompt_ask(
"(End with <shift + right-arrow>) Your input: ", multiline=True
)
self.log_conversation("user_input", user_input)
## (2) pass the information to the reasoning session.
with self.console.status("[bold green] PentestGPT Thinking...") as status:
response = self.reasoning_handler(self.prompts.discussion + user_input)
## (3) print the results
self.console.print("PentestGPT:\n", style="bold green")
self.console.print(response + "\n", style="yellow")
self.log_conversation("pentestGPT", response)
# Google
elif request_option == "google":
@ -238,22 +297,34 @@ class pentestGPT:
"Please enter your search query. PentestGPT will summarize the info from google.",
style="bold green",
)
self.log_conversation(
"pentestGPT",
"Please enter your search query. PentestGPT will summarize the info from google.",
)
user_input = prompt_ask(
"(End with <shift + right-arrow>) Your input: ", multiline=False
)
self.log_conversation("user_input", user_input)
with self.console.status("[bold green] PentestGPT Thinking...") as status:
# query the question
result = self.google_search(user_input, 5) # 5 results by default
result: dict = google_search(user_input, 5) # 5 results by default
# summarize the results
# TODO
response = "Google search results:\n" + "still under development."
self.console.print(response + "\n", style="yellow")
self.log_conversation("pentestGPT", response)
return response
# end
elif request_option == "quit":
response = False
self.console.print("Thank you for using PentestGPT!", style="bold green")
self.log_conversation("pentestGPT", "Thank you for using PentestGPT!")
else:
self.console.print("Please key in the correct options.", style="bold red")
response = self.input_handler()
self.log_conversation("pentestGPT", "Please key in the correct options.")
response = "Please key in the correct options."
return response
def main(self):
@ -268,6 +339,7 @@ class pentestGPT:
"Please describe the penetration testing task in one line, including the target IP, task type, etc.\n> ",
multiline=False,
)
self.log_conversation("user", init_description)
## Provide the information to the reasoning session for the task initialization.
prefixed_init_description = self.prompts.task_description + init_description
with self.console.status(
@ -285,8 +357,14 @@ class pentestGPT:
"PentestGPT suggests you to do the following: ", style="bold green"
)
self.console.print(_response)
self.log_conversation(
"PentestGPT", "PentestGPT suggests you to do the following: \n" + _response
)
self.console.print("You may start with:", style="bold green")
self.console.print(first_generation_response)
self.log_conversation(
"PentestGPT", "You may start with: \n" + first_generation_response
)
# 4. enter the main loop.
while True:
@ -299,6 +377,11 @@ class pentestGPT:
# Summarize the session and end
# TODO.
# log the session.
## save self.history into a txt file based on timestamp
log_name = "pentestGPT_log_" + dt.now().strftime("%Y%m%d_%H%M%S") + ".json"
with open(log_name, "w") as f:
json.dump(self.history, f)
# clear the sessions
# TODO.

View File

@ -124,6 +124,7 @@ def google_search(keyword, num_results=5) -> dict:
for url in search(keyword, tld="com", num=num_results, stop=num_results, pause=2):
search_result[url] = parse_web(url)
result = {"keyword": keyword, "search_result": search_result}
return result
if __name__ == "__main__":