mirror of
https://github.com/weyne85/PentestGPT.git
synced 2025-10-29 16:58:59 +00:00
update sqlmap
This commit is contained in:
53
example_sqlmap.py
Normal file
53
example_sqlmap.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from chatgpt_wrapper import ChatGPT
|
||||
import os
|
||||
from task_handle.template import chatGPTTemplate
|
||||
from task_handle.custom_exceptions import NoCodeFromResponseException
|
||||
|
||||
# the pre-defined variables
|
||||
init_script = """
|
||||
I want your help to with the SQL injection vulnerability detection with sqlmap.
|
||||
I'll give you the website, and you return the the command to execute in the terminal.
|
||||
I'll update you with the output from sqlmap, and you return the next command to execute.
|
||||
We repeat the process until a vulnerability is detected.
|
||||
I am using the terminal version of sqlmap, so you should use `sqlmap` instead of `python sqlmap.py`.
|
||||
Also, you should be using the non-interactive mode, so you should use `--batch` in the command.
|
||||
Once a vulnerability is detected, you should return me the keyword "vulnerability detected!!!".
|
||||
"""
|
||||
|
||||
keyword = "vulnerability detected!!!"
|
||||
prefix = "The output from terminal is :\n"
|
||||
|
||||
class sqlmapHandler(chatGPTTemplate):
|
||||
# should override the run function
|
||||
def run(self):
|
||||
self.initialize()
|
||||
response = self.ask("Now please start, the website is: http://testphp.vulnweb.com/listproducts.php?cat=1")
|
||||
while True:
|
||||
# get the response from the bot
|
||||
# if the keyword is detected, break the loop
|
||||
if keyword in response:
|
||||
break
|
||||
# extract the command
|
||||
try:
|
||||
command = self._extract_command(str(response))
|
||||
# execute the command
|
||||
output = self._cmd_wrapper(command)
|
||||
# print the output
|
||||
print("The output from terminal is :\n", output)
|
||||
# feed the output to the bot
|
||||
response = self.ask(output, need_prefix=True)
|
||||
except NoCodeFromResponseException as e:
|
||||
output = """
|
||||
No code is found in the response. Could you confirm the vulnerability is detected?
|
||||
If so, please return the keyword "vulnerability detected!!!" to me. Otherwise, please return the next command to execute."""
|
||||
# feed the output to the bot
|
||||
response = self.ask(output, need_prefix=True)
|
||||
|
||||
if __name__ == "__main__":
|
||||
#1. init the bot session
|
||||
bot = ChatGPT()
|
||||
chat_handler = sqlmapHandler(bot, init_script=init_script)
|
||||
chat_handler._update_prefix(prefix)
|
||||
|
||||
#2. run the chat
|
||||
chat_handler.run()
|
||||
@@ -1,3 +1,4 @@
|
||||
requests
|
||||
pyyaml
|
||||
playwright==1.28.0
|
||||
sqlmap
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
|
||||
import os
|
||||
import os, subprocess
|
||||
|
||||
def execute_cmd(cmd:str) -> str:
|
||||
"""
|
||||
@@ -15,13 +15,20 @@ def execute_cmd(cmd:str) -> str:
|
||||
output: str
|
||||
The output of the command.
|
||||
"""
|
||||
# execute the command in the mac terminal.
|
||||
# Alert!! The execution should be really careful, because it can be dangerous.
|
||||
# It is recommended to use this in a sandbox environment.
|
||||
try:
|
||||
output = os.popen(cmd).read()
|
||||
# execute the command in the system terminal
|
||||
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr = None, shell=True)
|
||||
output = ""
|
||||
# some tools may take time to execute. Wait until the output is finished.
|
||||
while True:
|
||||
line_output = p.stdout.readline()
|
||||
if line_output:
|
||||
output += line_output.decode("utf-8")
|
||||
if line_output == b'' and p.poll() is not None:
|
||||
break
|
||||
|
||||
return output
|
||||
except Exception as e:
|
||||
print("Error when executing command %s in terminal:" % cmd )
|
||||
print("Error is:", e)
|
||||
return False
|
||||
return output
|
||||
print("log: Error in executing the command:", cmd)
|
||||
print("Error in executing the command:", e)
|
||||
return None
|
||||
4
task_handle/custom_exceptions.py
Normal file
4
task_handle/custom_exceptions.py
Normal file
@@ -0,0 +1,4 @@
|
||||
# declare the custom types of exceptions
|
||||
|
||||
class NoCodeFromResponseException(Exception):
|
||||
pass
|
||||
155
task_handle/template.py
Normal file
155
task_handle/template.py
Normal file
@@ -0,0 +1,155 @@
|
||||
from chatgpt_wrapper import ChatGPT
|
||||
from task_handle.cmd_execution import execute_cmd
|
||||
import os, logging, re
|
||||
from .custom_exceptions import NoCodeFromResponseException
|
||||
|
||||
|
||||
class chatGPTTemplate():
|
||||
"""
|
||||
A template for the chatGPT task.
|
||||
It contains the basic functions that are required for the task.
|
||||
"""
|
||||
|
||||
def __init__(self, bot_session, init_script=None):
|
||||
"""
|
||||
Initialize the by taking the session
|
||||
The bot session is a standard chatgpt_wrapper bot session.
|
||||
More details at https://github.com/mmabrouk/chatgpt-wrapper
|
||||
|
||||
Parameters:
|
||||
|
||||
Variables:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
|
||||
## Default storage variable
|
||||
self._chat_history = [] # Ask, Answer, Ask, Answer, ...
|
||||
self.logger = logging.getLogger()
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
## Define all the variables
|
||||
self._bot_session = bot_session
|
||||
self._init_script = init_script
|
||||
self._prefix = None
|
||||
self._exception_ask = {}
|
||||
|
||||
def _extract_command(self, response: str) -> str:
|
||||
"""
|
||||
This function is used to extract the command from the response.
|
||||
Parameters:
|
||||
response (str): The response from the bot.
|
||||
Returns:
|
||||
command (str): The command to be executed.
|
||||
"""
|
||||
try:
|
||||
code_count = response.count("```")
|
||||
if code_count == 0:
|
||||
raise NoCodeFromResponseException("No code is found in the response.")
|
||||
elif code_count % 2 == 1:
|
||||
raise ValueError("The number of ``` is not even.")
|
||||
# Extract the command from the response.
|
||||
result_list = re.findall(r"```(.+?)```", response, re.DOTALL)
|
||||
if len(result_list) > 1:
|
||||
raise ValueError("More than one command is found.")
|
||||
except Exception: # Nonetype, nothing found
|
||||
raise NoCodeFromResponseException("No code is found in the response.")
|
||||
|
||||
result = result_list[0]
|
||||
if result[0] == "\n": # If the command starts with a newline, remove it.
|
||||
result = result[1:]
|
||||
|
||||
return result
|
||||
|
||||
def _cmd_wrapper(self, cmd: str) -> str:
|
||||
"""
|
||||
This function is used to wrap the command execution function.
|
||||
Parameters:
|
||||
cmd (str): The command to be executed.
|
||||
Returns:
|
||||
output (str): The output of the command, or an Exception
|
||||
"""
|
||||
# the possible types of exceptions
|
||||
output = execute_cmd(cmd)
|
||||
return output
|
||||
|
||||
|
||||
def _update_init_script(self, init_script: str):
|
||||
"""
|
||||
This function is used to update the initialization script.
|
||||
Parameters:
|
||||
init_script (str): The initialization script.
|
||||
Returns:
|
||||
"""
|
||||
self._init_script = init_script
|
||||
|
||||
def _update_prefix(self, prefix: str):
|
||||
"""
|
||||
This function is used to update the prefix.
|
||||
Parameters:
|
||||
prefix (str): The prefix to be appended.
|
||||
Returns:
|
||||
"""
|
||||
self._prefix = prefix
|
||||
|
||||
def _append_prefix(self, question: str, prefix: str):
|
||||
"""
|
||||
This function is used to append the prefix to the question to ask the bot.
|
||||
Parameters:
|
||||
prefix (str): The prefix to be appended.
|
||||
Returns:
|
||||
"""
|
||||
return prefix + question
|
||||
|
||||
|
||||
########## Implementations ##########
|
||||
|
||||
def initialize(self):
|
||||
"""
|
||||
This function is called when the task is initialized.
|
||||
This is used to provide the necessary information for the task.
|
||||
"""
|
||||
if self._init_script is not None:
|
||||
self._bot_session.ask(self._init_script)
|
||||
|
||||
def ask(self, question: str, need_prefix = False) -> str:
|
||||
"""
|
||||
Wrap the default bot ask function.
|
||||
Parameters:
|
||||
question (str): The question to ask the bot.
|
||||
Returns:
|
||||
response (str): The response from the bot.
|
||||
"""
|
||||
if need_prefix:
|
||||
question = self._append_prefix(question, self._prefix)
|
||||
try:
|
||||
self.logger.info("Asking the question: \n%s \n------------" % question)
|
||||
response = self._bot_session.ask(question)
|
||||
self.logger.info("The response is: \n%s \n------------" % response)
|
||||
self._chat_history.append(question)
|
||||
self._chat_history.append(response)
|
||||
return response
|
||||
except Exception as e:
|
||||
print("Error in asking the question:", e)
|
||||
return None
|
||||
|
||||
def exception_ask(self, question: str) -> str:
|
||||
"""
|
||||
This function is used to ask the bot when an exception is raised.
|
||||
Parameters:
|
||||
question (str): The question to ask the bot.
|
||||
Returns:
|
||||
response (str): The response from the bot.
|
||||
"""
|
||||
if self._exception_ask is not None:
|
||||
return self.ask(self._exception_ask)
|
||||
else:
|
||||
return None
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
The function with the main logic. This should be overwritten in the task execution.
|
||||
"""
|
||||
print("Please override the run function!")
|
||||
pass
|
||||
21
test_os_execution.py
Normal file
21
test_os_execution.py
Normal file
@@ -0,0 +1,21 @@
|
||||
# just a trial script to test the os module
|
||||
import subprocess
|
||||
|
||||
# use sqlmap in the terminal
|
||||
|
||||
cmd = 'sqlmap -u "http://testphp.vulnweb.com/listproducts.php?cat=1" --batch --level=5 --risk=3'
|
||||
|
||||
# execute the command
|
||||
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr = None, shell=True)
|
||||
output_str = ""
|
||||
while True:
|
||||
output = p.stdout.readline()
|
||||
if output:
|
||||
print(output.decode("utf-8"), end="")
|
||||
output_str += output.decode("utf-8")
|
||||
if output == b'' and p.poll() is not None:
|
||||
print("------end of output------")
|
||||
break
|
||||
|
||||
print(output_str)
|
||||
|
||||
Reference in New Issue
Block a user