diff --git a/__init__.py b/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tasks/example_chatgpt_api.py b/example_chatgpt_api.py similarity index 100% rename from tasks/example_chatgpt_api.py rename to example_chatgpt_api.py diff --git a/example_sqlmap.py b/example_sqlmap.py new file mode 100644 index 0000000..c2178d6 --- /dev/null +++ b/example_sqlmap.py @@ -0,0 +1,53 @@ +from chatgpt_wrapper import ChatGPT +import os +from task_handle.template import chatGPTTemplate +from task_handle.custom_exceptions import NoCodeFromResponseException + +# the pre-defined variables +init_script = """ +I want your help to with the SQL injection vulnerability detection with sqlmap. +I'll give you the website, and you return the the command to execute in the terminal. +I'll update you with the output from sqlmap, and you return the next command to execute. +We repeat the process until a vulnerability is detected. +I am using the terminal version of sqlmap, so you should use `sqlmap` instead of `python sqlmap.py`. +Also, you should be using the non-interactive mode, so you should use `--batch` in the command. +Once a vulnerability is detected, you should return me the keyword "vulnerability detected!!!". +""" + +keyword = "vulnerability detected!!!" +prefix = "The output from terminal is :\n" + +class sqlmapHandler(chatGPTTemplate): + # should override the run function + def run(self): + self.initialize() + response = self.ask("Now please start, the website is: http://testphp.vulnweb.com/listproducts.php?cat=1") + while True: + # get the response from the bot + # if the keyword is detected, break the loop + if keyword in response: + break + # extract the command + try: + command = self._extract_command(str(response)) + # execute the command + output = self._cmd_wrapper(command) + # print the output + print("The output from terminal is :\n", output) + # feed the output to the bot + response = self.ask(output, need_prefix=True) + except NoCodeFromResponseException as e: + output = """ + No code is found in the response. Could you confirm the vulnerability is detected? + If so, please return the keyword "vulnerability detected!!!" to me. Otherwise, please return the next command to execute.""" + # feed the output to the bot + response = self.ask(output, need_prefix=True) + +if __name__ == "__main__": + #1. init the bot session + bot = ChatGPT() + chat_handler = sqlmapHandler(bot, init_script=init_script) + chat_handler._update_prefix(prefix) + + #2. run the chat + chat_handler.run() diff --git a/requirements.txt b/requirements.txt index 4da4cd1..ae93c5f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ requests pyyaml playwright==1.28.0 +sqlmap diff --git a/task_handle/cmd_execution.py b/task_handle/cmd_execution.py index 38a06cc..82716f9 100644 --- a/task_handle/cmd_execution.py +++ b/task_handle/cmd_execution.py @@ -1,5 +1,5 @@ -import os +import os, subprocess def execute_cmd(cmd:str) -> str: """ @@ -15,13 +15,20 @@ def execute_cmd(cmd:str) -> str: output: str The output of the command. """ - # execute the command in the mac terminal. - # Alert!! The execution should be really careful, because it can be dangerous. - # It is recommended to use this in a sandbox environment. try: - output = os.popen(cmd).read() + # execute the command in the system terminal + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr = None, shell=True) + output = "" + # some tools may take time to execute. Wait until the output is finished. + while True: + line_output = p.stdout.readline() + if line_output: + output += line_output.decode("utf-8") + if line_output == b'' and p.poll() is not None: + break + + return output except Exception as e: - print("Error when executing command %s in terminal:" % cmd ) - print("Error is:", e) - return False - return output \ No newline at end of file + print("log: Error in executing the command:", cmd) + print("Error in executing the command:", e) + return None \ No newline at end of file diff --git a/task_handle/custom_exceptions.py b/task_handle/custom_exceptions.py new file mode 100644 index 0000000..0da2799 --- /dev/null +++ b/task_handle/custom_exceptions.py @@ -0,0 +1,4 @@ +# declare the custom types of exceptions + +class NoCodeFromResponseException(Exception): + pass \ No newline at end of file diff --git a/task_handle/template.py b/task_handle/template.py new file mode 100644 index 0000000..9077586 --- /dev/null +++ b/task_handle/template.py @@ -0,0 +1,155 @@ +from chatgpt_wrapper import ChatGPT +from task_handle.cmd_execution import execute_cmd +import os, logging, re +from .custom_exceptions import NoCodeFromResponseException + + +class chatGPTTemplate(): + """ + A template for the chatGPT task. + It contains the basic functions that are required for the task. + """ + + def __init__(self, bot_session, init_script=None): + """ + Initialize the by taking the session + The bot session is a standard chatgpt_wrapper bot session. + More details at https://github.com/mmabrouk/chatgpt-wrapper + + Parameters: + + Variables: + + Returns: + + """ + + ## Default storage variable + self._chat_history = [] # Ask, Answer, Ask, Answer, ... + self.logger = logging.getLogger() + logging.basicConfig(level=logging.INFO) + + ## Define all the variables + self._bot_session = bot_session + self._init_script = init_script + self._prefix = None + self._exception_ask = {} + + def _extract_command(self, response: str) -> str: + """ + This function is used to extract the command from the response. + Parameters: + response (str): The response from the bot. + Returns: + command (str): The command to be executed. + """ + try: + code_count = response.count("```") + if code_count == 0: + raise NoCodeFromResponseException("No code is found in the response.") + elif code_count % 2 == 1: + raise ValueError("The number of ``` is not even.") + # Extract the command from the response. + result_list = re.findall(r"```(.+?)```", response, re.DOTALL) + if len(result_list) > 1: + raise ValueError("More than one command is found.") + except Exception: # Nonetype, nothing found + raise NoCodeFromResponseException("No code is found in the response.") + + result = result_list[0] + if result[0] == "\n": # If the command starts with a newline, remove it. + result = result[1:] + + return result + + def _cmd_wrapper(self, cmd: str) -> str: + """ + This function is used to wrap the command execution function. + Parameters: + cmd (str): The command to be executed. + Returns: + output (str): The output of the command, or an Exception + """ + # the possible types of exceptions + output = execute_cmd(cmd) + return output + + + def _update_init_script(self, init_script: str): + """ + This function is used to update the initialization script. + Parameters: + init_script (str): The initialization script. + Returns: + """ + self._init_script = init_script + + def _update_prefix(self, prefix: str): + """ + This function is used to update the prefix. + Parameters: + prefix (str): The prefix to be appended. + Returns: + """ + self._prefix = prefix + + def _append_prefix(self, question: str, prefix: str): + """ + This function is used to append the prefix to the question to ask the bot. + Parameters: + prefix (str): The prefix to be appended. + Returns: + """ + return prefix + question + + + ########## Implementations ########## + + def initialize(self): + """ + This function is called when the task is initialized. + This is used to provide the necessary information for the task. + """ + if self._init_script is not None: + self._bot_session.ask(self._init_script) + + def ask(self, question: str, need_prefix = False) -> str: + """ + Wrap the default bot ask function. + Parameters: + question (str): The question to ask the bot. + Returns: + response (str): The response from the bot. + """ + if need_prefix: + question = self._append_prefix(question, self._prefix) + try: + self.logger.info("Asking the question: \n%s \n------------" % question) + response = self._bot_session.ask(question) + self.logger.info("The response is: \n%s \n------------" % response) + self._chat_history.append(question) + self._chat_history.append(response) + return response + except Exception as e: + print("Error in asking the question:", e) + return None + + def exception_ask(self, question: str) -> str: + """ + This function is used to ask the bot when an exception is raised. + Parameters: + question (str): The question to ask the bot. + Returns: + response (str): The response from the bot. + """ + if self._exception_ask is not None: + return self.ask(self._exception_ask) + else: + return None + + def run(self): + """ + The function with the main logic. This should be overwritten in the task execution. + """ + print("Please override the run function!") + pass \ No newline at end of file diff --git a/tasks/ROS_bug_classification b/tasks/ROS_bug_classification deleted file mode 100644 index e69de29..0000000 diff --git a/test_os_execution.py b/test_os_execution.py new file mode 100644 index 0000000..90b8256 --- /dev/null +++ b/test_os_execution.py @@ -0,0 +1,21 @@ +# just a trial script to test the os module +import subprocess + +# use sqlmap in the terminal + +cmd = 'sqlmap -u "http://testphp.vulnweb.com/listproducts.php?cat=1" --batch --level=5 --risk=3' + +# execute the command +p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr = None, shell=True) +output_str = "" +while True: + output = p.stdout.readline() + if output: + print(output.decode("utf-8"), end="") + output_str += output.decode("utf-8") + if output == b'' and p.poll() is not None: + print("------end of output------") + break + +print(output_str) + \ No newline at end of file