Dataflow Analysis

Detect injection vulnerabilities by tracking how untrusted data flows through your code. Master dataflow analysis to find OWASP Top 10 vulnerabilities.

What is Dataflow Analysis?

Dataflow analysis (also called taint analysis) tracks how data moves through your program. It answers the question: "Can untrusted user input reach a dangerous function?"

SOURCE (untrusted data)
↓ flows through assignments
↓ flows through function calls
↓ flows through string operations
SINK (dangerous function)
VULNERABILITY DETECTED! 🔴

Why is this powerful?

Instead of just finding execute() calls, you find execute() calls that use user input. This dramatically reduces false positives while catching real vulnerabilities.

flows() Function

The flows() function is your primary tool for dataflow analysis:

from codepathfinder import rule, flows, calls
from codepathfinder.presets import PropagationPresets

@rule(id="sql-injection", severity="critical", cwe="CWE-89")
def detect_sql_injection():
    """Detects SQL injection vulnerabilities"""
    return flows(
        from_sources=calls("request.GET", "request.POST"),
        to_sinks=calls("execute", "executemany"),
        sanitized_by=calls("escape", "quote"),
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )

Parameters explained:

  • from_sources - Where untrusted data comes from (user input)
  • to_sinks - Dangerous functions that should not receive untrusted data
  • sanitized_by - Functions that make data safe (optional)
  • propagates_through - How taint spreads (assignments, function calls, etc.)
  • scope - Track within one function ("local") or across functions ("global")

Sources and Sinks

Sources: Where Untrusted Data Comes From

Sources are entry points for untrusted data. Common sources include:

# Web application sources
from_sources=[
    calls("request.GET"),       # URL parameters
    calls("request.POST"),      # Form data
    calls("request.args.get"),  # Flask args
    calls("request.form.get"),  # Flask forms
    calls("request.json"),      # JSON payloads
]

# CLI application sources
from_sources=[
    calls("input"),             # User input
    calls("sys.argv"),          # Command line args
]

# File reading sources
from_sources=[
    calls("open"),
    calls("*.read"),
    calls("*.readline"),
]

Sinks: Dangerous Functions

Sinks are functions that become dangerous when receiving untrusted data:

# SQL sinks
to_sinks=[
    calls("execute"),
    calls("executemany"),
    calls("*.execute"),
    calls("*.raw"),
]

# Command execution sinks
to_sinks=[
    calls("system"),
    calls("popen"),
    calls("os.system"),
    calls("subprocess.*"),
]

# Code execution sinks
to_sinks=[
    calls("eval"),
    calls("exec"),
    calls("compile"),
]

# File operation sinks
to_sinks=[
    calls("open"),
    calls("*.write"),
    calls("*.read"),
]

Sanitizers

Sanitizers are functions that clean or validate data, breaking the taint flow. Specifying sanitizers reduces false positives significantly.

# SQL sanitizers
sanitized_by=[
    calls("escape"),
    calls("escape_string"),
    calls("quote_sql"),
    calls("parameterize"),
]

# Command injection sanitizers
sanitized_by=[
    calls("shlex.quote"),
    calls("pipes.quote"),
]

# Path traversal sanitizers
sanitized_by=[
    calls("os.path.basename"),
    calls("os.path.normpath"),
]

# XSS sanitizers
sanitized_by=[
    calls("html.escape"),
    calls("bleach.clean"),
    calls("*.escape"),
]

Example with sanitizer:

# This will NOT be detected (sanitized)
user_input = request.GET.get("name")
safe_input = escape(user_input)
cursor.execute(f"SELECT * FROM users WHERE name='{safe_input}'")

The escape() call breaks the taint flow, so no vulnerability is reported.

Propagation

Propagation defines HOW taint spreads through code. Code Pathfinder provides presets for common patterns:

Propagation Presets

from codepathfinder.presets import PropagationPresets

# Minimal (fastest, ~60-70% coverage)
PropagationPresets.minimal()
# - Variable assignments (x = tainted)
# - Function arguments (func(tainted))

# Standard (recommended, ~75-80% coverage)
PropagationPresets.standard()
# - All minimal patterns
# - Function returns (return tainted)
# - String concatenation ("prefix" + tainted)
# - String formatting (f"{tainted}")

# Comprehensive (most thorough)
PropagationPresets.comprehensive()
# - All standard patterns
# - Additional edge cases

Recommendation: Use PropagationPresets.standard() for most security rules. It provides excellent coverage while maintaining good performance.

Explicit Propagation

For fine-grained control, specify exactly how taint should propagate:

from codepathfinder import propagates

flows(
    from_sources=calls("input"),
    to_sinks=calls("eval"),
    propagates_through=[
        propagates.assignment(),      # x = tainted
        propagates.function_args(),   # func(tainted)
        propagates.function_returns(), # return tainted
        propagates.string_concat(),   # "str" + tainted
        propagates.string_format(),   # f"{tainted}"
    ],
    scope="local"
)

Scope: Local vs Global

Local Scope (Intra-procedural)

Tracks taint within a single function. Faster and simpler, but won't detect vulnerabilities that cross function boundaries.

@rule(id="simple-sqli", severity="high")
def detect_simple_sql_injection():
    """Detects SQL injection within same function"""
    return flows(
        from_sources=calls("request.GET"),
        to_sinks=calls("execute"),
        propagates_through=PropagationPresets.minimal(),
        scope="local"  # Same function only
    )

Detects:

def view(request):
    user_id = request.GET.get("id")
    cursor.execute(f"SELECT * FROM users WHERE id={user_id}")
    # ↑ DETECTED (source and sink in same function)

Does NOT detect:

def get_user_input(request):
    return request.GET.get("id")

def view(request):
    user_id = get_user_input(request)
    cursor.execute(f"SELECT * FROM users WHERE id={user_id}")
    # ↑ NOT DETECTED (crosses function boundary)

Global Scope (Inter-procedural)

Tracks taint across function calls. More comprehensive but slower.

@rule(id="comprehensive-sqli", severity="critical")
def detect_comprehensive_sql_injection():
    """Detects SQL injection across function boundaries"""
    return flows(
        from_sources=calls("request.GET"),
        to_sinks=calls("execute"),
        propagates_through=PropagationPresets.standard(),
        scope="global"  # Across functions
    )

Now both examples above will be detected!

Complete Examples

SQL Injection

@rule(id="sqli-comprehensive", severity="critical", cwe="CWE-89", owasp="A03:2021")
def detect_sql_injection():
    """Comprehensive SQL injection detection"""
    return flows(
        from_sources=[
            calls("request.GET"),
            calls("request.POST"),
            calls("request.args.get"),
            calls("request.form.get"),
            calls("input"),
        ],
        to_sinks=[
            calls("execute"),
            calls("executemany"),
            calls("*.execute"),
            calls("*.executemany"),
            calls("*.raw"),
        ],
        sanitized_by=[
            calls("escape"),
            calls("escape_string"),
            calls("*.escape"),
            calls("parameterize"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )

Command Injection

@rule(id="cmdi", severity="critical", cwe="CWE-78", owasp="A03:2021")
def detect_command_injection():
    """Detects OS command injection"""
    return flows(
        from_sources=[
            calls("request.*"),
            calls("input"),
        ],
        to_sinks=[
            calls("system"),
            calls("popen"),
            calls("os.system"),
            calls("subprocess.*"),
        ],
        sanitized_by=[
            calls("shlex.quote"),
            calls("pipes.quote"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )

Path Traversal

@rule(id="path-traversal", severity="high", cwe="CWE-22")
def detect_path_traversal():
    """Detects path traversal vulnerabilities"""
    return flows(
        from_sources=[
            calls("request.*"),
            calls("input"),
        ],
        to_sinks=[
            calls("open"),
            calls("*.read"),
            calls("*.write"),
        ],
        sanitized_by=[
            calls("os.path.basename"),
            calls("os.path.normpath"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )

Server-Side Request Forgery (SSRF)

@rule(id="ssrf", severity="high", cwe="CWE-918", owasp="A10:2021")
def detect_ssrf():
    """Detects SSRF vulnerabilities"""
    return flows(
        from_sources=[
            calls("request.GET"),
            calls("request.POST"),
        ],
        to_sinks=[
            calls("requests.get"),
            calls("requests.post"),
            calls("urllib.request.urlopen"),
        ],
        sanitized_by=[
            calls("validate_url"),
            calls("is_safe_url"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )