mirror of
https://github.com/weyne85/PentestGPT.git
synced 2025-10-29 16:58:59 +00:00
feat: 🎸 Add logging function
This commit is contained in:
@@ -68,9 +68,9 @@ class ChatGPT:
|
||||
self.conversation_dict: Dict[str, Conversation] = {}
|
||||
self.headers = dict(
|
||||
{
|
||||
"cookie": f"cf_clearance={self.cf_clearance}; _puid={self._puid}; __Secure-next-auth.session-token={self.session_token}",
|
||||
# "cookie": self.config.cookie,
|
||||
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36",
|
||||
# "cookie": f"cf_clearance={self.cf_clearance}; _puid={self._puid}; __Secure-next-auth.session-token={self.session_token}",
|
||||
"cookie": self.config.cookie,
|
||||
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
|
||||
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"
|
||||
# 'Content-Type': 'text/event-stream; charset=utf-8',
|
||||
}
|
||||
@@ -79,6 +79,8 @@ class ChatGPT:
|
||||
|
||||
def get_authorization(self):
|
||||
url = "https://chat.openai.com/api/auth/session"
|
||||
if "cookie" in vars(self.config):
|
||||
self.headers["cookie"] = self.config.cookie
|
||||
r = requests.get(url, headers=self.headers)
|
||||
authorization = r.json()["accessToken"]
|
||||
# authorization = self.config.accessToken
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
# This file is deprecated. It is not used in the project.
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import json
|
||||
@@ -5,12 +6,13 @@ import re
|
||||
import time
|
||||
from uuid import uuid1
|
||||
import datetime
|
||||
from chatgpt_wrapper import OpenAIAPI
|
||||
|
||||
import loguru
|
||||
import requests
|
||||
|
||||
from chatgpt_wrapper import ChatGPT
|
||||
from chatgpt_wrapper.config import Config
|
||||
from config.chatgpt_config import ChatGPTConfig
|
||||
|
||||
logger = loguru.logger
|
||||
|
||||
@@ -22,7 +24,7 @@ class ChatGPTBrowser:
|
||||
"""
|
||||
|
||||
def __init__(self, model=None):
|
||||
config = Config()
|
||||
config = ChatGPTConfig()
|
||||
if model is not None:
|
||||
config.set("chat.model", model)
|
||||
self.bot = ChatGPT(config)
|
||||
@@ -48,6 +50,7 @@ class ChatGPTBrowser:
|
||||
def send_message(self, message, conversation_id):
|
||||
# 发送会话窗口消息
|
||||
# TODO: send message from browser
|
||||
# check here: https://github.com/mmabrouk/chatgpt-wrapper/blob/bafd0be7fb3355ea4a4b0276ade9f0fc6e8571fd/chatgpt_wrapper/backends/openai/repl.py#L101
|
||||
return
|
||||
|
||||
def extract_code_fragments(self, text):
|
||||
@@ -61,7 +64,14 @@ class ChatGPTBrowser:
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
chatgptBrowser_session = ChatGPTBrowser()
|
||||
text, conversation_id = chatgptBrowser_session.send_new_message(
|
||||
"I am a new tester for RESTful APIs."
|
||||
)
|
||||
# chatgptBrowser_session = ChatGPTBrowser()
|
||||
# text, conversation_id = chatgptBrowser_session.send_new_message(
|
||||
# "I am a new tester for RESTful APIs."
|
||||
# )
|
||||
|
||||
bot = OpenAIAPI()
|
||||
success, response, message = bot.ask("Hello, world!")
|
||||
if success:
|
||||
print(response)
|
||||
else:
|
||||
raise RuntimeError(message)
|
||||
|
||||
@@ -8,9 +8,11 @@ from utils.prompt_select import prompt_select, prompt_ask
|
||||
from prompt_toolkit.formatted_text import HTML
|
||||
from utils.task_handler import main_task_entry, mainTaskCompleter
|
||||
from utils.web_parser import google_search, parse_web
|
||||
import time
|
||||
import datetime as dt
|
||||
|
||||
import loguru
|
||||
import time, os, textwrap
|
||||
import time, os, textwrap, json
|
||||
|
||||
logger = loguru.logger
|
||||
logger.add(sink="logs/pentest_gpt.log")
|
||||
@@ -53,6 +55,32 @@ class pentestGPT:
|
||||
self.step_reasoning = (
|
||||
None # the response from the reasoning session for the current step
|
||||
)
|
||||
self.history = {
|
||||
"user": [],
|
||||
"pentestGPT": [],
|
||||
"reasoning": [],
|
||||
"input_parsing": [],
|
||||
"generation": [],
|
||||
"exception": [],
|
||||
} # the history of the current conversation
|
||||
|
||||
def log_conversation(self, source, text):
|
||||
"""
|
||||
append the conversation into the history
|
||||
|
||||
Parameters:
|
||||
----------
|
||||
source: str
|
||||
the source of the conversation
|
||||
text: str
|
||||
the content of the conversation
|
||||
"""
|
||||
# append the conversation into the history
|
||||
timestamp = time.time()
|
||||
if source not in self.history.keys():
|
||||
# an exception
|
||||
source = "exception"
|
||||
self.history[source].append((timestamp, text))
|
||||
|
||||
def initialize(self):
|
||||
# initialize the backbone sessions and test the connection to chatGPT
|
||||
@@ -89,6 +117,8 @@ class pentestGPT:
|
||||
response = self.chatGPT4Agent.send_message(
|
||||
self.prompts.process_results + text, self.test_reasoning_session_id
|
||||
)
|
||||
# log the conversation
|
||||
self.log_conversation("reasoning", response)
|
||||
return response
|
||||
|
||||
def input_parsing_handler(self, text, source=None) -> str:
|
||||
@@ -110,12 +140,15 @@ class pentestGPT:
|
||||
summarized_content += self.chatGPTAgent.send_message(
|
||||
prefix + word_limit + wrapped_input, self.input_parsing_session_id
|
||||
)
|
||||
# log the conversation
|
||||
self.log_conversation("input_parsing", summarized_content)
|
||||
return summarized_content
|
||||
|
||||
def test_generation_handler(self, text):
|
||||
# send the contents to chatGPT test_generation_session and obtain the results
|
||||
response = self.chatGPTAgent.send_message(text, self.test_generation_session_id)
|
||||
# print the results
|
||||
# log the conversation
|
||||
self.log_conversation("generation", response)
|
||||
return response
|
||||
|
||||
def input_handler(self) -> str:
|
||||
@@ -131,6 +164,7 @@ class pentestGPT:
|
||||
self.chat_count += 1
|
||||
|
||||
request_option = main_task_entry()
|
||||
self.log_conversation("user", request_option)
|
||||
# request_option = prompt_select(
|
||||
# title=f"({self.chat_count}) > Please select your options with cursor: ",
|
||||
# values=[
|
||||
@@ -141,6 +175,7 @@ class pentestGPT:
|
||||
# ],
|
||||
# )
|
||||
# pass output
|
||||
|
||||
if request_option == "help":
|
||||
print(mainTaskCompleter().task_details)
|
||||
|
||||
@@ -159,6 +194,9 @@ class pentestGPT:
|
||||
"Your input: (End with <shift + right-arrow>)", style="bold green"
|
||||
)
|
||||
user_input = prompt_ask("> ", multiline=True)
|
||||
self.log_conversation(
|
||||
"user_input", "Source: " + options[int(source)] + "\n" + user_input
|
||||
)
|
||||
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
||||
parsed_input = self.input_parsing_handler(
|
||||
user_input, source=options[int(source)]
|
||||
@@ -173,6 +211,11 @@ class pentestGPT:
|
||||
style="bold green",
|
||||
)
|
||||
self.console.print(reasoning_response + "\n")
|
||||
self.log_conversation(
|
||||
"pentestGPT",
|
||||
"Based on the analysis, the following tasks are recommended:"
|
||||
+ reasoning_response,
|
||||
)
|
||||
response = reasoning_response
|
||||
|
||||
# generate more test details (beginner mode)
|
||||
@@ -183,7 +226,9 @@ class pentestGPT:
|
||||
"You have not initialized the task yet. Please perform the basic testing following `next` option.",
|
||||
style="bold red",
|
||||
)
|
||||
return
|
||||
response = "You have not initialized the task yet. Please perform the basic testing following `next` option."
|
||||
self.log_conversation("pentestGPT", response)
|
||||
return response
|
||||
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
||||
generation_response = self.test_generation_handler(
|
||||
self.step_reasoning_response
|
||||
@@ -195,6 +240,7 @@ class pentestGPT:
|
||||
)
|
||||
self.console.print(generation_response + "\n")
|
||||
response = generation_response
|
||||
self.log_conversation("pentestGPT", response)
|
||||
|
||||
# ask for task list (to-do list)
|
||||
elif request_option == "todo":
|
||||
@@ -216,20 +262,33 @@ class pentestGPT:
|
||||
)
|
||||
self.console.print(generation_response + "\n")
|
||||
response = reasoning_response
|
||||
self.log_conversation(
|
||||
"pentestGPT",
|
||||
"Based on the analysis, the following tasks are recommended:"
|
||||
+ reasoning_response
|
||||
+ "\n"
|
||||
+ "You can follow the instructions below to complete the tasks."
|
||||
+ generation_response,
|
||||
)
|
||||
|
||||
# pass other information, such as questions or some observations.
|
||||
elif request_option == "discuss":
|
||||
## (1) Request for user multi-line input
|
||||
self.console.print("Please share your thoughts/questions with PentestGPT.")
|
||||
self.log_conversation(
|
||||
"pentestGPT", "Please share your thoughts/questions with PentestGPT."
|
||||
)
|
||||
user_input = prompt_ask(
|
||||
"(End with <shift + right-arrow>) Your input: ", multiline=True
|
||||
)
|
||||
self.log_conversation("user_input", user_input)
|
||||
## (2) pass the information to the reasoning session.
|
||||
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
||||
response = self.reasoning_handler(self.prompts.discussion + user_input)
|
||||
## (3) print the results
|
||||
self.console.print("PentestGPT:\n", style="bold green")
|
||||
self.console.print(response + "\n", style="yellow")
|
||||
self.log_conversation("pentestGPT", response)
|
||||
|
||||
# Google
|
||||
elif request_option == "google":
|
||||
@@ -238,22 +297,34 @@ class pentestGPT:
|
||||
"Please enter your search query. PentestGPT will summarize the info from google.",
|
||||
style="bold green",
|
||||
)
|
||||
self.log_conversation(
|
||||
"pentestGPT",
|
||||
"Please enter your search query. PentestGPT will summarize the info from google.",
|
||||
)
|
||||
user_input = prompt_ask(
|
||||
"(End with <shift + right-arrow>) Your input: ", multiline=False
|
||||
)
|
||||
self.log_conversation("user_input", user_input)
|
||||
with self.console.status("[bold green] PentestGPT Thinking...") as status:
|
||||
# query the question
|
||||
result = self.google_search(user_input, 5) # 5 results by default
|
||||
result: dict = google_search(user_input, 5) # 5 results by default
|
||||
# summarize the results
|
||||
# TODO
|
||||
response = "Google search results:\n" + "still under development."
|
||||
self.console.print(response + "\n", style="yellow")
|
||||
self.log_conversation("pentestGPT", response)
|
||||
return response
|
||||
|
||||
# end
|
||||
elif request_option == "quit":
|
||||
response = False
|
||||
self.console.print("Thank you for using PentestGPT!", style="bold green")
|
||||
self.log_conversation("pentestGPT", "Thank you for using PentestGPT!")
|
||||
|
||||
else:
|
||||
self.console.print("Please key in the correct options.", style="bold red")
|
||||
response = self.input_handler()
|
||||
|
||||
self.log_conversation("pentestGPT", "Please key in the correct options.")
|
||||
response = "Please key in the correct options."
|
||||
return response
|
||||
|
||||
def main(self):
|
||||
@@ -268,6 +339,7 @@ class pentestGPT:
|
||||
"Please describe the penetration testing task in one line, including the target IP, task type, etc.\n> ",
|
||||
multiline=False,
|
||||
)
|
||||
self.log_conversation("user", init_description)
|
||||
## Provide the information to the reasoning session for the task initialization.
|
||||
prefixed_init_description = self.prompts.task_description + init_description
|
||||
with self.console.status(
|
||||
@@ -285,8 +357,14 @@ class pentestGPT:
|
||||
"PentestGPT suggests you to do the following: ", style="bold green"
|
||||
)
|
||||
self.console.print(_response)
|
||||
self.log_conversation(
|
||||
"PentestGPT", "PentestGPT suggests you to do the following: \n" + _response
|
||||
)
|
||||
self.console.print("You may start with:", style="bold green")
|
||||
self.console.print(first_generation_response)
|
||||
self.log_conversation(
|
||||
"PentestGPT", "You may start with: \n" + first_generation_response
|
||||
)
|
||||
|
||||
# 4. enter the main loop.
|
||||
while True:
|
||||
@@ -299,6 +377,11 @@ class pentestGPT:
|
||||
|
||||
# Summarize the session and end
|
||||
# TODO.
|
||||
# log the session.
|
||||
## save self.history into a txt file based on timestamp
|
||||
log_name = "pentestGPT_log_" + dt.now().strftime("%Y%m%d_%H%M%S") + ".json"
|
||||
with open(log_name, "w") as f:
|
||||
json.dump(self.history, f)
|
||||
|
||||
# clear the sessions
|
||||
# TODO.
|
||||
|
||||
@@ -124,6 +124,7 @@ def google_search(keyword, num_results=5) -> dict:
|
||||
for url in search(keyword, tld="com", num=num_results, stop=num_results, pause=2):
|
||||
search_result[url] = parse_web(url)
|
||||
result = {"keyword": keyword, "search_result": search_result}
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user