Source code for pennylane.estimator.estimate

# Copyright 2025 Xanadu Quantum Technologies Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
r"""Core resource estimation logic."""
from collections import defaultdict
from collections.abc import Callable, Iterable
from functools import singledispatch, wraps

from pennylane.measurements.measurements import MeasurementProcess
from pennylane.operation import Operation, Operator
from pennylane.queuing import AnnotatedQueue, QueuingManager
from pennylane.wires import Wires
from pennylane.workflow.qnode import QNode

from .resource_config import ResourceConfig
from .resource_mapping import _map_to_resource_op
from .resource_operator import CompressedResourceOp, GateCount, ResourceOperator
from .resources_base import DefaultGateSet, Resources
from .wires_manager import Allocate, Deallocate, WireResourceManager

# pylint: disable=too-many-arguments


[docs] def estimate( workflow: Callable | ResourceOperator | Resources | QNode, gate_set: set[str] | None = None, zeroed: int = 0, any_state: int = 0, tight_budget: bool = False, config: ResourceConfig | None = None, ) -> Resources | Callable[..., Resources]: r"""Estimate the quantum resources required by a circuit or operator with respect to a given gateset. Args: workflow (Callable | :class:`~.pennylane.estimator.resource_operator.ResourceOperator` | :class:`~.pennylane.estimator.resources_base.Resources`): The quantum circuit or operator for which to estimate resources. gate_set (set[str] | None): A set of names (strings) of the fundamental operators to track counts for throughout the quantum workflow. zeroed (int | None): Number of zeroed state work wires. Default is ``0``. any_state (int | None): Number of work wires in an unknown state. Default is ``0``. tight_budget (bool | None): Determines whether extra zeroed state wires may be allocated when they exceed the available amount. The default is ``False``. config (:class:`~.pennylane.estimator.resource_config.ResourceConfig` | None): A ResourceConfig object which modifies default behaviour in the estimation pipeline. Returns: :class:`~.pennylane.estimator.resource_operator.Resources` | Callable[..., Resources]: The estimated quantum resources required to execute the circuit. Raises: TypeError: could not obtain resources for workflow of type :code:`type(workflow)` **Example** The resources of a quantum workflow can be estimated by passing the quantum function describing the workflow directly into this function. .. code-block:: python import pennylane.estimator as qre def my_circuit(): for w in range(2): qre.Hadamard(wires=w) qre.CNOT(wires=[0,1]) qre.RX(wires=0) qre.RY(wires=1) qre.QFT(num_wires=3, wires=[0, 1, 2]) return Note that a python function is passed here, not a :class:`~.QNode`. The resources for this workflow are then obtained by: >>> import pennylane.estimator as qre >>> config = qre.ResourceConfig() >>> config.set_single_qubit_rot_precision(1e-4) >>> res = qre.estimate( ... my_circuit, ... gate_set = qre.DefaultGateSet, ... config = config, ... )() ... >>> print(res) --- Resources: --- Total qubits: 3 Total gates : 279 Qubit breakdown: clean qubits: 0, dirty qubits: 0, algorithmic qubits: 3 Gate breakdown: {'Hadamard': 5, 'CNOT': 10, 'T': 264} """ return _estimate_resources_dispatch(workflow, gate_set, zeroed, any_state, tight_budget, config)
@singledispatch def _estimate_resources_dispatch( workflow: Callable | ResourceOperator | Resources | QNode, gate_set: set[str] | None = None, zeroed: int = 0, any_state: int = 0, tight_budget: bool = False, config: ResourceConfig | None = None, ) -> Resources | Callable[..., Resources]: """Internal singledispatch function for resource estimation.""" raise TypeError( f"Could not obtain resources for workflow of type {type(workflow)}. workflow must be one of Resources, Callable, ResourceOperator, or list" ) @_estimate_resources_dispatch.register def _resources_from_qfunc( workflow: Callable, gate_set: set[str] | None = None, zeroed: int = 0, any_state: int = 0, tight_budget: bool = False, config: ResourceConfig | None = None, ) -> Callable[..., Resources]: """Estimate resources for a quantum function which queues operators""" if isinstance(workflow, QNode): workflow = workflow.func @wraps(workflow) def wrapper(*args, **kwargs): with AnnotatedQueue() as q: workflow(*args, **kwargs) wire_manager = WireResourceManager(zeroed, any_state, 0, tight_budget) num_algo_qubits = 0 circuit_wires = [] for op in q.queue: if isinstance(op, (ResourceOperator, Operator, MeasurementProcess)): if op.wires: circuit_wires.append(op.wires) elif op.num_wires: num_algo_qubits = max(num_algo_qubits, op.num_wires) else: raise ValueError( f"Queued object '{op}' is not a ResourceOperator or Operator, and cannot be processed." ) num_algo_qubits += len(Wires.all_wires(circuit_wires)) wire_manager.algo_wires = num_algo_qubits # Obtain resources in the gate_set compressed_res_ops_list = _ops_to_compressed_reps(q.queue) gate_counts = defaultdict(int) for cmp_rep_op in compressed_res_ops_list: _update_counts_from_compressed_res_op( cmp_rep_op, gate_counts, wire_manager=wire_manager, gate_set=gate_set, config=config ) return Resources( zeroed=wire_manager.zeroed, any_state=wire_manager.any_state, algo_wires=wire_manager.algo_wires, gate_types=gate_counts, ) return wrapper @_estimate_resources_dispatch.register def _resources_from_resource( workflow: Resources, gate_set: set[str] | None = None, zeroed: int = 0, any_state: int = 0, tight_budget: bool = None, config: ResourceConfig | None = None, ) -> Resources: """Further process resources from a Resources object (i.e. a Resources object that contains high-level operators can be analyzed with respect to a lower-level gate set).""" wire_manager = WireResourceManager(zeroed, any_state, workflow.algo_wires, tight_budget) gate_counts = defaultdict(int) for cmpr_rep_op, count in workflow.gate_types.items(): _update_counts_from_compressed_res_op( cmpr_rep_op, gate_counts, wire_manager=wire_manager, gate_set=gate_set, scalar=count, config=config, ) return Resources( zeroed=wire_manager.zeroed, any_state=wire_manager.any_state, algo_wires=wire_manager.algo_wires, gate_types=gate_counts, ) @_estimate_resources_dispatch.register def _resources_from_resource_operator( workflow: ResourceOperator, gate_set: set[str] | None = None, zeroed: int = 0, any_state: int = 0, tight_budget: bool = None, config: ResourceConfig | None = None, ) -> Resources: """Extract resources from a resource operator.""" resources = 1 * workflow return _resources_from_resource( workflow=resources, gate_set=gate_set, zeroed=zeroed, any_state=any_state, tight_budget=tight_budget, config=config, ) @_estimate_resources_dispatch.register def _resources_from_pl_ops( workflow: Operation, gate_set: set[str] | None = None, zeroed: int = 0, any_state: int = 0, tight_budget: bool = None, config: ResourceConfig | None = None, ) -> Resources: """Extract resources from a pl operator.""" workflow = _map_to_resource_op(workflow) resources = 1 * workflow return _resources_from_resource( workflow=resources, gate_set=gate_set, zeroed=zeroed, any_state=any_state, tight_budget=tight_budget, config=config, ) def _update_counts_from_compressed_res_op( comp_res_op: CompressedResourceOp, gate_counts_dict: dict, wire_manager: WireResourceManager, gate_set: set[str] | None = None, scalar: int = 1, config: ResourceConfig | None = None, ) -> None: """Modifies the `gate_counts_dict` argument by adding the (scaled) resources of the operator provided. Args: comp_res_op (:class:`~.pennylane.estimator.resource_operator.CompressedResourceOp`): operator in compressed representation to extract resources from gate_counts_dict (dict): base dictionary to modify with the resource counts wire_manager (:class:`~.pennylane.estimator.wires_manager.WireResourceManager`): the `WireResourceManager` that tracks and manages the `zeroed`, `any_state`, and `algo_wires` wires. gate_set (set[str]): the set of operators to track resources with respect to scalar (int | None): optional scalar to multiply the counts. Defaults to 1. config (dict | None): additional parameters to specify the resources from an operator. Defaults to :class:`pennylane.estimator.resource_config.ResourceConfig`. """ if gate_set is None: gate_set = DefaultGateSet if config is None: config = ResourceConfig() ## Early return if compressed resource operator is already in our defined gate set if comp_res_op.name in gate_set: gate_counts_dict[comp_res_op] += scalar return ## Otherwise need to use its resource decomp to extract the resources decomp_func, kwargs = _get_decomposition(comp_res_op, config) params = {key: value for key, value in comp_res_op.params.items() if value is not None} filtered_kwargs = {key: value for key, value in kwargs.items() if key not in params} resource_decomp = decomp_func(**params, **filtered_kwargs) qubit_alloc_sum = _sum_allocated_wires(resource_decomp) for action in resource_decomp: if isinstance(action, GateCount): _update_counts_from_compressed_res_op( action.gate, gate_counts_dict, wire_manager=wire_manager, scalar=scalar * action.count, gate_set=gate_set, config=config, ) continue if isinstance(action, Allocate): # When qubits are allocated and deallocate in equal numbers, we allocate and deallocate # in series, meaning we don't need to apply the scalar if qubit_alloc_sum != 0: wire_manager.grab_zeroed(action.num_wires * scalar) else: wire_manager.grab_zeroed(action.num_wires) if isinstance(action, Deallocate): if qubit_alloc_sum != 0: wire_manager.free_wires(action.num_wires * scalar) else: wire_manager.free_wires(action.num_wires) return def _sum_allocated_wires(decomp): """Sum together the allocated and released wires in a decomposition.""" s = 0 for action in decomp: if isinstance(action, Allocate): s += action.num_wires if isinstance(action, Deallocate): s -= action.num_wires return s @QueuingManager.stop_recording() def _ops_to_compressed_reps( ops: Iterable[Operator | ResourceOperator], ) -> list[CompressedResourceOp]: """Convert the sequence of operators to a list of compressed resource ops. Args: ops (Iterable[Union[Operator, :class:`~.pennylane.estimator.resource_operator.ResourceOperator`]]): set of operators to convert Returns: List[CompressedResourceOp]: set of converted compressed resource ops """ cmp_rep_ops = [] for op in ops: # Skipping measurement processes here if isinstance(op, ResourceOperator): cmp_rep_ops.append(op.resource_rep_from_op()) elif isinstance(op, Operator): cmp_rep_ops.append(_map_to_resource_op(op).resource_rep_from_op()) return cmp_rep_ops def _get_decomposition( comp_res_op: CompressedResourceOp, config: ResourceConfig ) -> tuple[Callable, dict]: """ Selects the appropriate decomposition function and kwargs from a config object. This helper function centralizes the logic for choosing a decomposition, handling standard, custom, and symbolic operator rules using a mapping. Args: comp_res_op (:class:`~.pennylane.estimator.resource_operator.CompressedResourceOp`): The operator to find the decomposition for. config (:class:`~.pennylane.estimator.resource_config.ResourceConfig`): The configuration object containing decomposition rules. Returns: A tuple containing the decomposition function and its associated kwargs. """ op_type = comp_res_op.op_type # TODO: Restore logic to handle symbolic operators when symboli operators are merged. kwargs = config.resource_op_precisions.get(op_type, {}) decomp_func = config.custom_decomps.get(op_type, op_type.resource_decomp) return decomp_func, kwargs