From 10065c206956e26718fb90798f256d94b33cf706 Mon Sep 17 00:00:00 2001 From: GeekMasher Date: Tue, 25 Jul 2023 21:02:36 +0100 Subject: [PATCH] feat: update codeql --- src/ghastoolkit/codeql/cli.py | 101 ++++++++++++++++++++++----- src/ghastoolkit/codeql/packs/pack.py | 16 ++++- 2 files changed, 97 insertions(+), 20 deletions(-) diff --git a/src/ghastoolkit/codeql/cli.py b/src/ghastoolkit/codeql/cli.py index 8e45e319..93a998e6 100644 --- a/src/ghastoolkit/codeql/cli.py +++ b/src/ghastoolkit/codeql/cli.py @@ -1,5 +1,6 @@ """This is the CodeQL CLI Module.""" import os +import csv import json import logging import tempfile @@ -52,8 +53,7 @@ def runCommand(self, *argvs, display: bool = False) -> Optional[str]: cmd.extend(self.path_binary) cmd.extend(argvs) - if argvs[0] == "database": - cmd.extend(["--threads", "0", "--ram", "0"]) + logger.debug(f"Running Command :: {cmd}") if display: subprocess.check_output(cmd) @@ -64,39 +64,80 @@ def runCommand(self, *argvs, display: bool = False) -> Optional[str]: @property def version(self) -> str: """Get CodeQL Version from the CLI binary.""" - version = self.runCommand("version", "--format", "terse", display=True) + version = self.runCommand("version", "--format", "terse") if not version: raise Exception("CodeQL version not found") return version def runQuery( - self, database: CodeQLDatabase, path: Optional[str] = None + self, + database: CodeQLDatabase, + path: Optional[str] = None, + display: bool = False, ) -> CodeQLResults: """Run a CodeQL Query on a CodeQL Database.""" if not database.path: raise Exception("CodeQL Database path is not set") path = path or database.default_pack + logger.debug(f"Query path :: {path}") + + self.runCommand( + "database", + "run-queries", + database.path, + path, + display=display, + ) + if path.endswith(".ql"): + return self.getResults(database, path) + return self.getResults(database) - self.runCommand("database", "run-queries", database.path, path) - return self.getResults(database, path) + def runQueryWithParameters(self, database: CodeQLDatabase, path: str, **kwargs): + """Run a CodeQL query with parameters.""" + return def runRawQuery( - self, path: str, database: CodeQLDatabase, outputtype: str = "bqrs" - ) -> Union[dict, list]: + self, + path: str, + database: CodeQLDatabase, + display: bool = False, + outputtype: str = "sarif", + ) -> Union[list, CodeQLResults]: """Run raw query on a CodeQL Database.""" if not database.path: raise Exception("CodeQL Database path is not set") if not path.endswith(".ql"): raise Exception("runRawQuery requires a QL file") - self.runCommand("database", "run-queries", database.path, path) + self.runCommand("database", "run-queries", database.path, path, display=display) + + from ghastoolkit.codeql.packs.pack import CodeQLPack + + if ":" in path: + logger.debug("Running in pack mode") + pack_name, query_path = path.split(":", 1) + else: + logger.debug("Running in path mode") + + pack = CodeQLPack.findByQuery(path) + pack_name = pack.name + query_path = path.replace(pack.path + "/", "") + + logger.debug(f"Pack Name for query :: {pack_name} -> {query_path}") + + bqrs_query_path = query_path.replace(".ql", ".bqrs") + bqrs = os.path.join(database.path, "results", pack_name, bqrs_query_path) + + logger.debug(f"BQRS File location :: {bqrs}") + if outputtype == "bqrs": - bqrs = os.path.join( - database.path, "results", path.replace(":", "/").replace(".ql", ".bqrs") - ) - return self.readBqrs(bqrs) - return {} + if not os.path.exists(bqrs): + raise Exception(f"BQRS file does not exist") + return self.readBqrs(bqrs, display=display) + elif outputtype == "sarif": + return self.getResults(database, path) + return [] def getResults( self, database: CodeQLDatabase, path: Optional[str] = None @@ -120,19 +161,41 @@ def getResults( with open(sarif, "r") as handle: data = json.load(handle) + # clean up + os.remove(sarif) + results = data.get("runs", [])[0].get("results", []) return CodeQLResults.loadSarifResults(results) - def readBqrs(self, bqrsfile: str) -> dict: + def readBqrs( + self, + bqrsfile: str, + display: bool = False, + ) -> list[list[str]]: """Read a BQRS file to get the raw results.""" - output = os.path.join(tempfile.gettempdir(), "codeql-result.bqrs") + output = os.path.join(tempfile.gettempdir(), "codeql-result.csv") + logger.debug(f"Reading BQRS file :: {bqrsfile}") self.runCommand( - "bqrs", "decode", "--format", "json", "--output", output, bqrsfile + "bqrs", + "decode", + "--no-titles", + "--format", + "csv", + "--output", + output, + bqrsfile, + display=display, ) - + results = [] with open(output, "r") as handle: - return json.load(handle) + data = csv.reader(handle, delimiter=",") + for row in data: + results.append(row) + + # clean up + os.remove(output) + return results def __str__(self) -> str: """To String.""" diff --git a/src/ghastoolkit/codeql/packs/pack.py b/src/ghastoolkit/codeql/packs/pack.py index b06c7a17..d77595ad 100644 --- a/src/ghastoolkit/codeql/packs/pack.py +++ b/src/ghastoolkit/codeql/packs/pack.py @@ -5,7 +5,6 @@ import glob import logging from typing import Any, List, Optional -from collections import OrderedDict from semantic_version import Version import yaml @@ -77,6 +76,21 @@ def load(self): for name, version in data.get("dependencies", {}).items(): self.dependencies.append(CodeQLPack(name=name, version=version)) + @staticmethod + def findByQuery(query_path: str) -> Optional["CodeQLPack"]: + """Find Pack by query path.""" + stack = query_path.split("/") + if query_path.startswith("/"): + stack.insert(0, "/") + + while len(stack) != 0: + path = os.path.join(*stack, "qlpack.yml") + if os.path.exists(path): + return CodeQLPack(path) + + stack.pop(-1) + return + def run(self, *args, display: bool = False) -> Optional[str]: """Run Pack command.""" return self.cli.runCommand("pack", *args, display=display)