Source code for sqlspec.core.parameters._processor

"""Parameter processing pipeline orchestrator."""

from collections import OrderedDict
from collections.abc import Callable, Mapping, Sequence
from typing import Any, cast

from mypy_extensions import mypyc_attr

from sqlspec.core.parameters._alignment import looks_like_execute_many
from sqlspec.core.parameters._converter import ParameterConverter
from sqlspec.core.parameters._types import (
    _NAMED_STYLE_VALUES,
    _NAMED_STYLES,
    _POSITIONAL_STYLE_VALUES,
    ConvertedParameters,
    ParameterInfo,
    ParameterPayload,
    ParameterProcessingResult,
    ParameterProfile,
    ParameterStyle,
    ParameterStyleConfig,
    TypedParameter,
    wrap_with_type,
)
from sqlspec.core.parameters._validator import ParameterValidator
from sqlspec.utils.dispatch import TypeDispatcher

__all__ = ("ParameterProcessor", "structural_fingerprint", "value_fingerprint")

# Threshold for sampling execute_many parameters instead of full iteration
_EXECUTE_MANY_SAMPLE_THRESHOLD = 10
# Number of records to sample for type signatures
_EXECUTE_MANY_SAMPLE_SIZE = 3
_OCCURRENCE_BASED_POSITIONAL_STYLES = frozenset({
    ParameterStyle.QMARK,
    ParameterStyle.POSITIONAL_COLON,
    ParameterStyle.POSITIONAL_PYFORMAT,
})

TypeCoercionFallback = tuple[type, Callable[[Any], Any]]
_TYPE_COERCION_DISPATCHERS: "dict[tuple[TypeCoercionFallback, ...], TypeDispatcher[Callable[[Any], Any]]]" = {}


def structural_fingerprint(parameters: "ParameterPayload", is_many: bool = False) -> Any:
    """Return a structural fingerprint for caching parameter payloads.

    Returns a hashable tuple representing the structure (keys, types, count).
    Avoids string formatting for performance.

    Note: Uses Python 3.7+ dict insertion order instead of sorted() for determinism.
    This means fingerprints depend on the order keys were inserted, which is typically
    consistent within a single codebase.
    """
    if parameters is None:
        return None

    # Fast type dispatch: check concrete types first (2-4x faster than ABC isinstance)
    param_type = type(parameters)

    # Handle dict (most common Mapping type) - fast path
    if param_type is dict:
        dict_params = cast("dict[str, Any]", parameters)
        if not dict_params:
            return ("dict",)
        # Use dict insertion order (Python 3.7+ guaranteed) instead of sorted()
        # This is O(n) vs O(n log n) and produces consistent fingerprints for
        # parameters constructed in the same order (typical usage pattern)
        keys = tuple(dict_params.keys())
        type_sig = tuple(type(v) for v in dict_params.values())
        return ("dict", keys, type_sig)

    # Handle list and tuple (most common Sequence types) - fast path
    if param_type is list or param_type is tuple:
        seq_params = cast("Sequence[Any]", parameters)
        if not seq_params:
            return ("seq",)

        # Optimization: Fast path for single-item sequence (extremely common)
        if len(seq_params) == 1:
            return ("seq", (type(seq_params[0]),))

        if is_many:
            return _fingerprint_execute_many(seq_params)

        # Single execution with sequence parameters
        type_sig = tuple(type(v) for v in seq_params)
        return ("seq", type_sig)

    # Fallback to ABC checks for custom types (Mapping, Sequence subclasses)
    if isinstance(parameters, Mapping):
        if not parameters:
            return ("dict",)
        keys = tuple(parameters.keys())
        type_sig = tuple(type(v) for v in parameters.values())
        return ("dict", keys, type_sig)

    if isinstance(parameters, Sequence) and not isinstance(parameters, (str, bytes, bytearray)):
        if not parameters:
            return ("seq",)

        if len(parameters) == 1:
            return ("seq", (type(parameters[0]),))

        if is_many:
            return _fingerprint_execute_many(parameters)

        type_sig = tuple(type(v) for v in parameters)
        return ("seq", type_sig)

    # Scalar parameter
    return ("scalar", param_type)


def value_fingerprint(parameters: "ParameterPayload") -> Any:
    """Return a value-based fingerprint for parameter payloads.

    Unlike structural_fingerprint, this includes actual parameter VALUES in the hash.
    Used for static script compilation where SQL has values embedded directly.

    Args:
        parameters: Original parameter payload supplied by the caller.

    Returns:
        Hashable representation including parameter values.
    """
    if parameters is None:
        return None

    # Use repr for value-based hashing - includes both structure and values
    # Return as tuple to match structural_fingerprint return type (hashable)
    return ("values", repr(parameters))


[docs] @mypyc_attr(allow_interpreted_subclasses=False) class ParameterProcessor: """Parameter processing engine coordinating conversion phases.""" __slots__ = ("_cache", "_cache_hits", "_cache_max_size", "_cache_misses", "_converter", "_validator") DEFAULT_CACHE_SIZE = 1000
[docs] def __init__( self, *, converter: "ParameterConverter | None" = None, validator: "ParameterValidator | None" = None, cache_max_size: int | None = None, validator_cache_max_size: int | None = None, ) -> None: self._cache: OrderedDict[Any, ParameterProcessingResult] = OrderedDict() if cache_max_size is None: cache_max_size = self.DEFAULT_CACHE_SIZE self._cache_max_size = max(cache_max_size, 0) self._cache_hits = 0 self._cache_misses = 0 if converter is None: if validator is None: validator_cache = validator_cache_max_size if validator_cache is None: validator_cache = self._cache_max_size validator = ParameterValidator(cache_max_size=validator_cache) self._validator = validator self._converter = ParameterConverter(self._validator) else: self._converter = converter if validator is None: self._validator = converter.validator else: self._validator = validator self._converter.validator = validator if validator_cache_max_size is not None and isinstance(self._validator, ParameterValidator): self._validator.set_cache_max_size(validator_cache_max_size)
[docs] def clear_cache(self) -> None: """Clear cached processing results and reset stats.""" self._cache.clear() self._cache_hits = 0 self._cache_misses = 0 if isinstance(self._validator, ParameterValidator): self._validator.clear_cache()
[docs] def cache_stats(self) -> "dict[str, int]": """Return cache statistics for parameter processing.""" stats = { "hits": self._cache_hits, "misses": self._cache_misses, "size": len(self._cache), "max_size": self._cache_max_size, } if isinstance(self._validator, ParameterValidator): validator_stats = self._validator.cache_stats() stats["validator_hits"] = validator_stats["hits"] stats["validator_misses"] = validator_stats["misses"] stats["validator_size"] = validator_stats["size"] stats["validator_max_size"] = validator_stats["max_size"] else: stats["validator_hits"] = 0 stats["validator_misses"] = 0 stats["validator_size"] = 0 stats["validator_max_size"] = 0 return stats
def process( self, sql: str, parameters: "ParameterPayload", config: "ParameterStyleConfig", dialect: str | None = None, is_many: bool = False, wrap_types: bool = True, param_fingerprint: Any | None = None, ) -> "ParameterProcessingResult": return self._process_internal( sql, parameters, config, dialect=dialect, is_many=is_many, wrap_types=wrap_types, normalize_for_parsing=True, param_fingerprint=param_fingerprint, )
[docs] def process_for_execution( self, sql: str, parameters: "ParameterPayload", config: "ParameterStyleConfig", dialect: str | None = None, is_many: bool = False, wrap_types: bool = True, parsed_expression: Any = None, param_fingerprint: Any | None = None, ) -> "ParameterProcessingResult": """Process parameters for execution without parse normalization. Args: sql: SQL string to process. parameters: Parameter payload. config: Parameter style configuration. dialect: Optional SQL dialect. is_many: Whether this is execute_many. wrap_types: Whether to wrap parameters with type metadata. parsed_expression: Pre-parsed SQLGlot expression to preserve through pipeline. param_fingerprint: Pre-computed parameter fingerprint for cache key. Returns: ParameterProcessingResult with execution SQL and parameters. """ return self._process_internal( sql, parameters, config, dialect=dialect, is_many=is_many, wrap_types=wrap_types, normalize_for_parsing=False, parsed_expression=parsed_expression, param_fingerprint=param_fingerprint, )
def _process_internal( self, sql: str, parameters: "ParameterPayload", config: "ParameterStyleConfig", *, dialect: str | None, is_many: bool, wrap_types: bool, normalize_for_parsing: bool, parsed_expression: Any = None, param_fingerprint: Any | None = None, ) -> "ParameterProcessingResult": cache_key = None if self._cache_max_size > 0: cache_key = self._make_processor_cache_key( sql, parameters, config, is_many, dialect, wrap_types, normalize_for_parsing, param_fingerprint=param_fingerprint, ) cached_result = self._cache.get(cache_key) if cached_result is not None: self._cache.move_to_end(cache_key) self._cache_hits += 1 # For static script compilation, parameters are embedded directly in SQL. # Cache key includes parameter values, so a hit means same SQL with same values. # Return None for parameters since the driver shouldn't receive any. if config.needs_static_script_compilation: return ParameterProcessingResult( cached_result.sql, None, cached_result.parameter_profile, sqlglot_sql=cached_result.sqlglot_sql, parsed_expression=cached_result.parsed_expression, input_named_parameters=cached_result.input_named_parameters, applied_wrap_types=cached_result.applied_wrap_types, ) # Return cached SQL transformation with NEW parameters transformed # to match the cached SQL's placeholder format transformed_params = self._transform_cached_parameters( parameters, cached_result.parameter_profile, config, input_named_parameters=cached_result.input_named_parameters, is_many=is_many, apply_wrap_types=cached_result.applied_wrap_types, ) # Apply output transformer if present (it may further transform params) final_sql = cached_result.sql if config.output_transformer: final_sql, transformed_params = config.output_transformer(final_sql, transformed_params) return ParameterProcessingResult( final_sql, transformed_params, cached_result.parameter_profile, sqlglot_sql=cached_result.sqlglot_sql, parsed_expression=cached_result.parsed_expression, input_named_parameters=cached_result.input_named_parameters, applied_wrap_types=cached_result.applied_wrap_types, ) self._cache_misses += 1 param_info = self._validator.extract_parameters(sql) original_styles = {p.style for p in param_info} if param_info else set() needs_execution_conversion = self._needs_execution_placeholder_conversion(param_info, config) input_named_parameters = _named_parameters_for_style(param_info, config.default_execution_parameter_style) if config.needs_static_script_compilation and param_info and parameters and not is_many: return self._compile_static_script( sql, parameters, config, is_many, cache_key, input_named_parameters=input_named_parameters ) requires_mapping = self._needs_mapping_normalization(parameters, param_info, is_many) if ( not needs_execution_conversion and not config.type_coercion_map and not config.output_transformer and not requires_mapping ): normalized_sql = self._normalize_sql_for_parsing(sql, param_info, config) if normalize_for_parsing else sql result = ParameterProcessingResult( sql, parameters, ParameterProfile(param_info), sqlglot_sql=normalized_sql, parsed_expression=parsed_expression, input_named_parameters=input_named_parameters, applied_wrap_types=False, ) return self._store_cached_result(cache_key, result) processed_sql, processed_parameters = sql, parameters if requires_mapping: target_style = self._select_execution_style(original_styles, config) input_named_parameters = _named_parameters_for_style(param_info, target_style) mapping_plan = self._converter._build_conversion_plan( # pyright: ignore[reportPrivateUsage] param_info, target_style ) processed_sql, processed_parameters = self._converter.convert_placeholder_style( processed_sql, processed_parameters, target_style, is_many, strict_named_parameters=config.strict_named_parameters, param_info=param_info, precomputed_plan=mapping_plan, ) param_info = self._converter.convert_parameter_info_style(param_info, target_style, mapping_plan) original_styles = {target_style} needs_execution_conversion = False if needs_execution_conversion: target_style = self._select_execution_style(original_styles, config) input_named_parameters = _named_parameters_for_style(param_info, target_style) applied_wrap_types = False if processed_parameters and wrap_types: wrapped_parameters = self._wrap_parameter_types(processed_parameters) if wrapped_parameters is not processed_parameters: processed_parameters = wrapped_parameters applied_wrap_types = True if config.type_coercion_map and processed_parameters: processed_parameters = self._coerce_parameter_types(processed_parameters, config.type_coercion_map, is_many) processed_sql, processed_parameters, converted_param_info = self._convert_placeholders_for_execution( processed_sql, processed_parameters, config, param_info, original_styles, needs_execution_conversion, is_many, ) if config.output_transformer: processed_sql, processed_parameters = config.output_transformer(processed_sql, processed_parameters) final_param_info = converted_param_info if converted_param_info is not None else param_info final_profile = ParameterProfile(final_param_info) sqlglot_sql = ( self._normalize_sql_for_parsing(processed_sql, final_param_info, config) if normalize_for_parsing else processed_sql ) result = ParameterProcessingResult( processed_sql, processed_parameters, final_profile, sqlglot_sql=sqlglot_sql, parsed_expression=parsed_expression, input_named_parameters=input_named_parameters, applied_wrap_types=applied_wrap_types, ) return self._store_cached_result(cache_key, result) def _compile_static_script( self, sql: str, parameters: "ParameterPayload", config: "ParameterStyleConfig", is_many: bool, cache_key: Any | None, input_named_parameters: "tuple[str, ...]", ) -> "ParameterProcessingResult": coerced_params = parameters if config.type_coercion_map and parameters: coerced_params = self._coerce_parameter_types(parameters, config.type_coercion_map, is_many) static_sql, static_params = self._converter.convert_placeholder_style( sql, coerced_params, ParameterStyle.STATIC, is_many, strict_named_parameters=config.strict_named_parameters ) result = ParameterProcessingResult( static_sql, static_params, ParameterProfile.empty(), sqlglot_sql=static_sql, input_named_parameters=input_named_parameters, applied_wrap_types=False, ) return self._store_cached_result(cache_key, result) def _select_execution_style( self, original_styles: "set[ParameterStyle]", config: "ParameterStyleConfig" ) -> "ParameterStyle": if len(original_styles) == 1 and config.supported_execution_parameter_styles is not None: original_style = next(iter(original_styles)) if original_style in config.supported_execution_parameter_styles: return original_style return config.default_execution_parameter_style or config.default_parameter_style def _wrap_parameter_types(self, parameters: "ParameterPayload") -> "ConvertedParameters": # Fast type dispatch for common types param_type = type(parameters) if param_type is list: source = cast("list[Any]", parameters) wrapped_values: list[Any] | None = None for idx, value in enumerate(source): wrapped = wrap_with_type(value) if wrapped_values is None: if wrapped is value: continue wrapped_values = source[:idx] wrapped_values.append(wrapped) if wrapped_values is None: return parameters return wrapped_values if param_type is tuple: tuple_source = cast("tuple[Any, ...]", parameters) wrapped_values = [wrap_with_type(value) for value in tuple_source] if all(wrapped is value for wrapped, value in zip(wrapped_values, tuple_source, strict=False)): return parameters return wrapped_values if param_type is dict: source_mapping = cast("dict[str, Any]", parameters) wrapped_mapping: dict[str, Any] | None = None for key, value in source_mapping.items(): wrapped = wrap_with_type(value) if wrapped_mapping is None: if wrapped is value: continue wrapped_mapping = dict(source_mapping) wrapped_mapping[key] = wrapped if wrapped_mapping is None: return parameters return wrapped_mapping # Fallback to ABC checks for custom types if isinstance(parameters, Sequence) and not isinstance(parameters, (str, bytes)): wrapped_values = [wrap_with_type(value) for value in parameters] if all(wrapped is value for wrapped, value in zip(wrapped_values, parameters, strict=False)): return cast("ConvertedParameters", parameters) return wrapped_values if isinstance(parameters, Mapping): fallback_mapping: dict[str, Any] | None = None for key, value in parameters.items(): wrapped = wrap_with_type(value) if fallback_mapping is None: if wrapped is value: continue fallback_mapping = dict(parameters) fallback_mapping[key] = wrapped if fallback_mapping is None: return cast("ConvertedParameters", parameters) return fallback_mapping return None def _coerce_parameter_types( self, parameters: "ParameterPayload", type_coercion_map: "dict[type, Callable[[Any], Any]]", is_many: bool = False, ) -> "ConvertedParameters": fallback_items = _type_coercion_fallbacks(type_coercion_map) result = _coerce_parameters_payload(parameters, type_coercion_map, fallback_items, is_many) # Fast type narrowing - _coerce_parameters_payload returns object but produces concrete types if result is None: return None result_type = type(result) if result_type is dict: return result if result_type is list: return result if result_type is tuple: return result return result def _store_cached_result( self, cache_key: Any | None, result: "ParameterProcessingResult" ) -> "ParameterProcessingResult": if self._cache_max_size <= 0 or cache_key is None: return result self._cache[cache_key] = result self._cache.move_to_end(cache_key) if len(self._cache) > self._cache_max_size: self._cache.popitem(last=False) return result def _transform_cached_parameters( self, parameters: "ParameterPayload", cached_profile: "ParameterProfile", config: "ParameterStyleConfig", *, input_named_parameters: "tuple[str, ...]", is_many: bool, apply_wrap_types: bool, ) -> "ConvertedParameters": """Apply parameter transformations for a cache hit. Uses cached metadata to efficiently transform parameters without re-parsing SQL. This ensures new parameter values undergo the same transformations as the original cached request (type wrapping, coercion, named-to-positional mapping). Args: parameters: New parameter payload to transform. cached_profile: Cached ParameterProfile with execution parameter metadata. config: Parameter style configuration. input_named_parameters: Cached input named parameter order. is_many: Whether this is execute_many. apply_wrap_types: Whether to wrap parameters with type metadata. Returns: Transformed parameters matching the cached SQL's placeholder format. """ if parameters is None: return None processed = parameters if apply_wrap_types and processed: processed = self._wrap_parameter_types(processed) if config.type_coercion_map and processed: processed = self._coerce_parameter_types(processed, config.type_coercion_map, is_many) if processed: cached_styles = cached_profile.styles if input_named_parameters and any(style in _POSITIONAL_STYLE_VALUES for style in cached_styles): processed = self._map_named_to_positional( processed, input_named_parameters, is_many, strict=config.strict_named_parameters ) elif any(style in _NAMED_STYLE_VALUES for style in cached_styles): processed = self._map_positional_to_named(processed, cached_profile, is_many) if processed and config.ast_transformer is not None and not is_many: processed = self._drop_pruned_null_parameters(processed) return processed @staticmethod def _drop_pruned_null_parameters(parameters: "ConvertedParameters") -> "ConvertedParameters": """Replicate AST null-pruning on a cache hit. Adapters with a null-pruning ast_transformer replace ``None`` placeholders with literal ``NULL`` and drop the matching values on the first (uncached) compile. The transform does not re-run on cache hits, so the cached SQL keeps fewer placeholders than the freshly supplied values; this removes the ``None`` values to realign them. """ if isinstance(parameters, Mapping): return {key: value for key, value in parameters.items() if value is not None} if isinstance(parameters, (list, tuple)) and not isinstance(parameters, (str, bytes, bytearray)): cleaned = [value for value in parameters if value is not None] return tuple(cleaned) if isinstance(parameters, tuple) else cleaned return parameters def _map_positional_to_named( self, parameters: "ConvertedParameters", cached_profile: "ParameterProfile", is_many: bool ) -> "ConvertedParameters": """Map a positional sequence to a dict keyed by cached placeholder names.""" cached_param_info = cached_profile.parameters if not cached_param_info: return parameters if is_many and isinstance(parameters, (list, tuple)): rows = cast("Sequence[Any]", parameters) mapped_rows: list[Any] = [] for row in rows: if isinstance(row, Mapping): mapped_rows.append(row) continue if isinstance(row, (list, tuple)) and not isinstance(row, (str, bytes, bytearray)): mapped_rows.append({ (param.name or f"param_{param.ordinal}"): row[idx] for idx, param in enumerate(cached_param_info) if idx < len(row) }) continue mapped_rows.append(row) return mapped_rows if isinstance(parameters, Mapping): return parameters if isinstance(parameters, (list, tuple)) and not isinstance(parameters, (str, bytes, bytearray)): seq = cast("Sequence[Any]", parameters) return { (param.name or f"param_{param.ordinal}"): seq[idx] for idx, param in enumerate(cached_param_info) if idx < len(seq) } return parameters def _map_named_to_positional( self, parameters: "ConvertedParameters", named_order: "tuple[str, ...]", is_many: bool, strict: bool = False ) -> "ConvertedParameters": """Map named parameters (dict) to positional (tuple) using cached order. Args: parameters: Current parameters (dict or sequence). named_order: Tuple of parameter names in placeholder order. is_many: Whether this is execute_many. strict: Whether to raise an error if required parameters are missing. Returns: Parameters converted to positional tuple if input was dict, else unchanged. Raises: SQLSpecError: If strict is True and required parameters are missing. """ if not named_order: return parameters param_type = type(parameters) if is_many and (param_type is list or param_type is tuple): parameter_rows = cast("Sequence[Any]", parameters) updated_rows: list[Any] | None = None for idx, row in enumerate(parameter_rows): row_type = type(row) if row_type is dict: row_dict: dict[str, Any] = row if strict: missing = [name for name in named_order if name not in row_dict] if missing: from sqlspec.exceptions import SQLSpecError msg = f"Missing required parameters: {missing}" raise SQLSpecError(msg) mapped_row: Any = tuple(row_dict.get(name) for name in named_order) elif isinstance(row, Mapping): # Fallback for custom Mapping types if strict: missing = [name for name in named_order if name not in row] if missing: from sqlspec.exceptions import SQLSpecError msg = f"Missing required parameters: {missing}" raise SQLSpecError(msg) mapped_row = tuple(row.get(name) for name in named_order) else: mapped_row = row if updated_rows is None: if mapped_row is row: continue updated_rows = list(parameter_rows[:idx]) updated_rows.append(mapped_row) if updated_rows is None: return parameters if param_type is tuple: return tuple(updated_rows) return updated_rows if param_type is dict: dict_parameters = cast("dict[str, Any]", parameters) if strict: missing = [name for name in named_order if name not in dict_parameters] if missing: from sqlspec.exceptions import SQLSpecError msg = f"Missing required parameters: {missing}" raise SQLSpecError(msg) return tuple(dict_parameters.get(name) for name in named_order) # Fallback for custom Mapping types if isinstance(parameters, Mapping): if strict: missing = [name for name in named_order if name not in parameters] if missing: from sqlspec.exceptions import SQLSpecError msg = f"Missing required parameters: {missing}" raise SQLSpecError(msg) return tuple(parameters.get(name) for name in named_order) return parameters def _needs_mapping_normalization( self, payload: "ParameterPayload", param_info: "list[ParameterInfo]", is_many: bool ) -> bool: if not payload or not param_info: return False has_named_placeholders = any(param.style in _NAMED_STYLES for param in param_info) if has_named_placeholders: return False looks_many = is_many or looks_like_execute_many(payload) if not looks_many: return False # Fast type dispatch for common types payload_type = type(payload) if payload_type is dict: return True if payload_type is list or payload_type is tuple: # Check if any item is a dict (fast path) or Mapping (fallback) seq_payload = cast("Sequence[Any]", payload) for item in seq_payload: item_type = type(item) if item_type is dict: return True if isinstance(item, Mapping): return True return False # Fallback for custom types if isinstance(payload, Mapping): return True if isinstance(payload, Sequence) and not isinstance(payload, (str, bytes, bytearray)): return any(isinstance(item, Mapping) for item in payload) return False def _normalize_sql_for_parsing( self, sql: str, param_info: "list[ParameterInfo]", config: "ParameterStyleConfig" ) -> str: """Normalize SQL for sqlglot parsing by converting unsupported parameter styles. When a parameter style is not in config.supported_parameter_styles (what sqlglot can parse for this dialect), convert it to config.default_parameter_style. Args: sql: SQL string with parameters. param_info: List of detected parameter placeholders. config: Parameter style configuration. Returns: SQL string with parameters converted to a sqlglot-compatible style. """ if not self._needs_parse_normalization(param_info, config): return sql # Convert to the default style that sqlglot can parse for this dialect target_style = config.default_parameter_style normalized_sql, _ = self._converter.convert_placeholder_style( sql, None, target_style, is_many=False, param_info=param_info ) return normalized_sql def _make_processor_cache_key( self, sql: str, parameters: "ParameterPayload", config: "ParameterStyleConfig", is_many: bool, dialect: str | None, wrap_types: bool, normalize_for_parsing: bool, *, param_fingerprint: Any | None = None, ) -> tuple[Any, ...]: if param_fingerprint is None: # For static script compilation, we must include actual values in the fingerprint if config.needs_static_script_compilation: param_fingerprint = value_fingerprint(parameters) else: # Use structural fingerprint (keys + types, not values) for better cache hit rates param_fingerprint = structural_fingerprint(parameters, is_many) dialect_marker = dialect or "default" # Include both input and execution parameter styles to avoid cache collisions input_style = config.default_parameter_style.value if config.default_parameter_style else "unknown" exec_style = ( config.default_execution_parameter_style.value if config.default_execution_parameter_style else input_style ) return _make_cache_key_tuple( sql, param_fingerprint, input_style, exec_style, dialect_marker, is_many, wrap_types, normalize_for_parsing ) def _needs_execution_placeholder_conversion( self, param_info: "list[ParameterInfo]", config: "ParameterStyleConfig" ) -> bool: """Determine whether execution placeholder conversion is required.""" if config.needs_static_script_compilation: return True if not param_info: return False current_styles = {param.style for param in param_info} if ( config.allow_mixed_parameter_styles and len(current_styles) > 1 and config.supported_execution_parameter_styles is not None and len(config.supported_execution_parameter_styles) > 1 and all(style in config.supported_execution_parameter_styles for style in current_styles) ): return False if len(current_styles) > 1: return True if len(current_styles) == 1: current_style = next(iter(current_styles)) supported_styles = config.supported_execution_parameter_styles if supported_styles is None: return True return current_style not in supported_styles return True def _needs_parse_normalization(self, param_info: "list[ParameterInfo]", config: "ParameterStyleConfig") -> bool: """Check if SQL needs normalization before sqlglot parsing. A style needs normalization if it's NOT in config.supported_parameter_styles, which represents what sqlglot can parse for this driver's dialect. Args: param_info: List of detected parameter placeholders. config: Parameter style configuration with supported_parameter_styles. Returns: True if any parameter style is not supported by sqlglot for this dialect. """ supported = config.supported_parameter_styles return any(p.style not in supported for p in param_info) def _convert_placeholders_for_execution( self, sql: str, parameters: "ParameterPayload", config: "ParameterStyleConfig", param_info: "list[ParameterInfo]", original_styles: "set[ParameterStyle]", needs_execution_conversion: bool, is_many: bool, ) -> "tuple[str, ConvertedParameters, list[ParameterInfo] | None]": if not needs_execution_conversion: # Convert parameters to concrete type for return if parameters is None: return sql, None, None if isinstance(parameters, dict): return sql, parameters, None if isinstance(parameters, list): return sql, parameters, None if isinstance(parameters, tuple): return sql, parameters, None if isinstance(parameters, Mapping): return sql, dict(parameters), None if isinstance(parameters, Sequence) and not isinstance(parameters, (str, bytes)): return sql, list(parameters), None if len(param_info) == 1: return sql, [parameters], None return sql, None, None target_style = self._select_execution_style(original_styles, config) execution_plan = self._converter._build_conversion_plan( # pyright: ignore[reportPrivateUsage] param_info, target_style ) converted_param_info = self._converter.convert_parameter_info_style(param_info, target_style, execution_plan) if is_many and config.preserve_original_params_for_many and isinstance(parameters, (list, tuple)): processed_sql, _ = self._converter.convert_placeholder_style( sql, parameters, target_style, is_many, strict_named_parameters=config.strict_named_parameters, param_info=param_info, precomputed_plan=execution_plan, ) return processed_sql, parameters, converted_param_info processed_sql, processed_parameters = self._converter.convert_placeholder_style( sql, parameters, target_style, is_many, strict_named_parameters=config.strict_named_parameters, param_info=param_info, precomputed_plan=execution_plan, ) return processed_sql, processed_parameters, converted_param_info
def _fingerprint_execute_many(parameters: "Sequence[Any]") -> Any: """Generate fingerprint for execute_many parameters. Extracted to reduce code duplication and allow inlining of the common single-execution path. """ param_count = len(parameters) sample_size = ( min(_EXECUTE_MANY_SAMPLE_SIZE, param_count) if param_count > _EXECUTE_MANY_SAMPLE_THRESHOLD else param_count ) first = parameters[0] first_type = type(first) # Fast type dispatch for first element if first_type is dict: keys = tuple(first.keys()) type_sig = tuple(type(v) for v in first.values()) return ("many_dict", keys, type_sig, param_count) if first_type is list or first_type is tuple: type_sigs: list[tuple[type, ...]] = [] for i in range(sample_size): param_item: Any = parameters[i] type_sigs.append(tuple(type(v) for v in param_item)) return ("many_seq", tuple(type_sigs), param_count) # Fallback to ABC checks if isinstance(first, Mapping): keys = tuple(first.keys()) type_sig = tuple(type(v) for v in first.values()) return ("many_dict", keys, type_sig, param_count) if isinstance(first, Sequence) and not isinstance(first, (str, bytes)): type_sigs = [] for i in range(sample_size): param_item = parameters[i] type_sigs.append(tuple(type(v) for v in param_item)) return ("many_seq", tuple(type_sigs), param_count) # Scalar values in sequence for execute_many type_sig = tuple(type(parameters[i]) for i in range(sample_size)) return ("many_scalar", type_sig, param_count) def _type_coercion_fallbacks( type_coercion_map: "dict[type, Callable[[Any], Any]]", ) -> "tuple[TypeCoercionFallback, ...]": return tuple(type_coercion_map.items()) def _get_type_coercion_dispatcher( fallback_items: "tuple[TypeCoercionFallback, ...]", ) -> "TypeDispatcher[Callable[[Any], Any]]": dispatcher = _TYPE_COERCION_DISPATCHERS.get(fallback_items) if dispatcher is not None: return dispatcher dispatcher = TypeDispatcher["Callable[[Any], Any]"]() dispatcher.register_all(fallback_items) _TYPE_COERCION_DISPATCHERS[fallback_items] = dispatcher return dispatcher def _resolve_type_coercion( value: object, type_coercion_map: "dict[type, Callable[[Any], Any]]", fallback_items: "tuple[TypeCoercionFallback, ...]", ) -> object: value_type = type(value) exact_converter = type_coercion_map.get(value_type) if exact_converter is not None: return exact_converter(value) fallback_converter = _get_type_coercion_dispatcher(fallback_items).get(value) if fallback_converter is not None: return fallback_converter(value) return value def _coerce_nested_value( value: object, type_coercion_map: "dict[type, Callable[[Any], Any]]", fallback_items: "tuple[TypeCoercionFallback, ...]", ) -> object: # Fast type dispatch for common types value_type = type(value) if value_type is list or value_type is tuple: seq_value = cast("Sequence[Any]", value) return [_coerce_parameter_value(item, type_coercion_map, fallback_items) for item in seq_value] if value_type is dict: dict_value = cast("dict[Any, Any]", value) return {key: _coerce_parameter_value(val, type_coercion_map, fallback_items) for key, val in dict_value.items()} return value def _coerce_parameter_value( value: object, type_coercion_map: "dict[type, Callable[[Any], Any]]", fallback_items: "tuple[TypeCoercionFallback, ...]", ) -> object: if value is None: return value value_type = type(value) # Fast path: check TypedParameter by type identity (2-4x faster than isinstance) if value_type is TypedParameter: typed_param = cast("TypedParameter", value) wrapped_value: object = typed_param.value if wrapped_value is None: return wrapped_value coerced = _resolve_type_coercion(wrapped_value, type_coercion_map, fallback_items) if coerced is wrapped_value: return wrapped_value return _coerce_nested_value(coerced, type_coercion_map, fallback_items) coerced = _resolve_type_coercion(value, type_coercion_map, fallback_items) if coerced is value: return value return _coerce_nested_value(coerced, type_coercion_map, fallback_items) def _coerce_sequence_preserving_identity( seq_value: "Sequence[Any]", type_coercion_map: "dict[type, Callable[[Any], Any]]", fallback_items: "tuple[TypeCoercionFallback, ...]", ) -> "Sequence[Any] | list[Any]": updated_seq: list[Any] | None = None for idx, item in enumerate(seq_value): coerced_value = _coerce_parameter_value(item, type_coercion_map, fallback_items) if updated_seq is None: if coerced_value is item: continue updated_seq = list(seq_value[:idx]) updated_seq.append(coerced_value) if updated_seq is None: return seq_value return updated_seq def _coerce_mapping_preserving_identity( mapping: "Mapping[Any, Any]", type_coercion_map: "dict[type, Callable[[Any], Any]]", fallback_items: "tuple[TypeCoercionFallback, ...]", ) -> "Mapping[Any, Any] | dict[Any, Any]": updated_mapping: dict[Any, Any] | None = None for key, val in mapping.items(): coerced_value = _coerce_parameter_value(val, type_coercion_map, fallback_items) if updated_mapping is None: if coerced_value is val: continue updated_mapping = dict(mapping) updated_mapping[key] = coerced_value if updated_mapping is None: return mapping return updated_mapping def _coerce_parameter_set( param_set: object, type_coercion_map: "dict[type, Callable[[Any], Any]]", fallback_items: "tuple[TypeCoercionFallback, ...]", ) -> object: # Fast type dispatch for common types param_type = type(param_set) if param_type is list: return _coerce_sequence_preserving_identity(cast("list[Any]", param_set), type_coercion_map, fallback_items) if param_type is tuple: seq_value = cast("tuple[Any, ...]", param_set) coerced_seq = _coerce_sequence_preserving_identity(seq_value, type_coercion_map, fallback_items) if coerced_seq is seq_value: return seq_value return tuple(cast("list[Any]", coerced_seq)) if param_type is dict: return _coerce_mapping_preserving_identity(cast("dict[Any, Any]", param_set), type_coercion_map, fallback_items) # Fallback to ABC checks for custom types if isinstance(param_set, Sequence) and not isinstance(param_set, (str, bytes)): seq_fallback = param_set coerced_seq = _coerce_sequence_preserving_identity(seq_fallback, type_coercion_map, fallback_items) if coerced_seq is seq_fallback: return param_set return coerced_seq if isinstance(param_set, Mapping): coerced_mapping = _coerce_mapping_preserving_identity(param_set, type_coercion_map, fallback_items) if coerced_mapping is param_set: return param_set return coerced_mapping return _coerce_parameter_value(param_set, type_coercion_map, fallback_items) def _coerce_parameters_payload( parameters: "ParameterPayload", type_coercion_map: "dict[type, Callable[[Any], Any]]", fallback_items: "tuple[TypeCoercionFallback, ...]", is_many: bool, ) -> object: # Fast type dispatch for common types param_type = type(parameters) if param_type is list: seq_params = cast("list[Any]", parameters) if is_many: updated_many: list[Any] | None = None for idx, param_set in enumerate(seq_params): coerced_set = _coerce_parameter_set(param_set, type_coercion_map, fallback_items) if updated_many is None: if coerced_set is param_set: continue updated_many = seq_params[:idx] updated_many.append(coerced_set) if updated_many is None: return seq_params return updated_many updated_seq: list[Any] | None = None for idx, item in enumerate(seq_params): coerced_item = _coerce_parameter_value(item, type_coercion_map, fallback_items) if updated_seq is None: if coerced_item is item: continue updated_seq = seq_params[:idx] updated_seq.append(coerced_item) if updated_seq is None: return seq_params return updated_seq if param_type is tuple: tuple_params = cast("tuple[Any, ...]", parameters) if is_many: return [_coerce_parameter_set(param_set, type_coercion_map, fallback_items) for param_set in tuple_params] return [_coerce_parameter_value(item, type_coercion_map, fallback_items) for item in tuple_params] if param_type is dict: dict_params = cast("dict[Any, Any]", parameters) updated_mapping: dict[Any, Any] | None = None for key, val in dict_params.items(): coerced_value = _coerce_parameter_value(val, type_coercion_map, fallback_items) if updated_mapping is None: if coerced_value is val: continue updated_mapping = dict(dict_params) updated_mapping[key] = coerced_value if updated_mapping is None: return dict_params return updated_mapping # Fallback to ABC checks for custom types if is_many and isinstance(parameters, Sequence) and not isinstance(parameters, (str, bytes)): return [_coerce_parameter_set(param_set, type_coercion_map, fallback_items) for param_set in parameters] if isinstance(parameters, Sequence) and not isinstance(parameters, (str, bytes)): return [_coerce_parameter_value(item, type_coercion_map, fallback_items) for item in parameters] if isinstance(parameters, Mapping): return {key: _coerce_parameter_value(val, type_coercion_map, fallback_items) for key, val in parameters.items()} return _coerce_parameter_value(parameters, type_coercion_map, fallback_items) def _make_cache_key_tuple( sql: str, param_fingerprint: Any, input_style: str, exec_style: str, dialect: str | None, is_many: bool, wrap_types: bool | None = None, normalize_for_parsing: bool | None = None, ) -> tuple[Any, ...]: """Build the shared SQL and parameter processor cache key tuple.""" if wrap_types is None and normalize_for_parsing is None: return (sql, param_fingerprint, input_style, exec_style, dialect, is_many) return (sql, param_fingerprint, input_style, exec_style, dialect, is_many, wrap_types, normalize_for_parsing) def _named_parameters_for_style( param_info: "list[ParameterInfo]", target_style: "ParameterStyle | None" ) -> "tuple[str, ...]": names = tuple(p.name for p in param_info if p.name is not None) if target_style in _OCCURRENCE_BASED_POSITIONAL_STYLES: return names return tuple(dict.fromkeys(names))