-
Notifications
You must be signed in to change notification settings - Fork 92
New transpiler techniques (gate optmization): U3GateFusion, RotationGateFusion and InverseCancellation #1606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
3c5bec3
fa6c783
45c7899
1c3e1fe
0aa50c6
16b2476
9b38e35
cfa0342
5142c60
372808d
96f55e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| from typing import Optional | ||
|
|
||
| import networkx as nx | ||
| import numpy as np | ||
|
|
||
| from qibo import gates | ||
| from qibo.config import raise_error | ||
|
|
@@ -66,3 +67,311 @@ def __call__(self, circuit: Circuit): | |
| else: | ||
| new.add(fgate) | ||
| return new | ||
|
|
||
|
|
||
| class InverseCancellation(Optimizer): | ||
|
|
||
| def __init__(self): | ||
| """Identifies and removes pairs of adjacent gates from | ||
| a quantum circuit. | ||
| """ | ||
|
|
||
| self.inverse_gates = ( | ||
| gates.H | gates.Y | gates.Z | gates.X | gates.CNOT | gates.CZ | gates.SWAP | ||
| ) | ||
| self.pos_rm_inv_gate = [] | ||
carlos-luque marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def __call__(self, circuit: Circuit) -> Circuit: | ||
carlos-luque marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Performs pattern recognition to detect and eliminate adjacent gate pairs | ||
| in the quantum circuit. | ||
|
|
||
| Args: | ||
| circuit (:class:`qibo.models.circuit.Circuit`): Circuit to have gates identified. | ||
|
|
||
| Returns: | ||
| circuit (:class:`qibo.models.circuit.Circuit`): A new circuit with the pairs of | ||
| adjacent gates removed. | ||
| """ | ||
|
|
||
| if 0 == circuit.ngates or circuit.gates_of_type(gates.FusedGate): | ||
| return circuit | ||
|
|
||
| self.__find_pos_rm(circuit.nqubits, circuit.queue) | ||
|
|
||
| if 0 == len(self.pos_rm_inv_gate): | ||
| return circuit | ||
|
|
||
| tmp_new_circuit = circuit.copy(True) | ||
| new_circuit = circuit.__class__(**tmp_new_circuit.init_kwargs) | ||
| for i, gate in enumerate(tmp_new_circuit.queue): | ||
| if not i in self.pos_rm_inv_gate: | ||
| new_circuit.add(gate) | ||
| return new_circuit | ||
|
|
||
| def __find_pos_rm(self, number_qubits: int, list_gates: list): | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Identifies and marks pairs of inverse gates that | ||
| can be removed from the circuit. | ||
|
|
||
| Args: | ||
| number_qubits (int): number of qubits | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| list_gates (list): a list of gates (:class:`qibo.gates.abstract.Gate`) | ||
| """ | ||
|
|
||
| previous_gates = [None] * number_qubits | ||
|
|
||
| for i, gate in enumerate(list_gates): | ||
|
|
||
| primary_qubit = gate.init_args[0] | ||
| other_qubits = gate.init_args[1:] | ||
|
|
||
| if previous_gates[primary_qubit] is None: | ||
| self.__update_previous_gates(gate.init_args, i, gate, previous_gates) | ||
|
|
||
| elif isinstance(gate, self.inverse_gates): | ||
| same_gate = self.__same_gates(gate, previous_gates[primary_qubit][1]) | ||
|
|
||
| secondary_match = True | ||
| if other_qubits: # for multi-qubits gates | ||
| conditions = [] | ||
|
|
||
| for q in other_qubits: | ||
| has_previous_gate = previous_gates[q] is not None | ||
| is_same_gate = self.__same_gates(gate, previous_gates[q][1]) | ||
| conditions.append(has_previous_gate and is_same_gate) | ||
|
|
||
| secondary_match = all(conditions) | ||
|
|
||
| if same_gate and secondary_match: | ||
| self.pos_rm_inv_gate.extend([previous_gates[primary_qubit][0], i]) | ||
|
|
||
| for q in gate.init_args: | ||
| previous_gates[q] = None | ||
| else: | ||
| self.__update_previous_gates( | ||
| gate.init_args, i, gate, previous_gates | ||
| ) | ||
| else: | ||
| self.__update_previous_gates(gate.init_args, i, gate, previous_gates) | ||
|
|
||
| @staticmethod | ||
| def __update_previous_gates(qubits, idx, gate, previous_gates): | ||
| """Helper function to update previous gate tracking.""" | ||
|
|
||
| for q in qubits: | ||
| previous_gates[q] = (idx, gate) | ||
|
|
||
| @staticmethod | ||
| def __same_gates(gate1: gates.Gate, gate2: gates.Gate) -> bool: | ||
| """Determines whether two gates are considered the same. | ||
|
|
||
| Args: | ||
| gate1: The first gate (:class:`qibo.gates.abstract.Gate`). | ||
| gate2: The second gate (:class:`qibo.gates.abstract.Gate`). | ||
|
|
||
| Returns: | ||
| True if the gates are the same, otherwise False. | ||
| """ | ||
|
|
||
| if gate1 is None or gate2 is None: | ||
| return False | ||
|
|
||
| paramaters_gate1 = (gate1.name, gate1.init_args, gate1.init_kwargs) | ||
| paramaters_gate2 = (gate2.name, gate2.init_args, gate2.init_kwargs) | ||
|
|
||
| return bool(paramaters_gate1 == paramaters_gate2) | ||
|
|
||
|
|
||
| class RotationGateFusion(Optimizer): | ||
|
|
||
| def __init__(self): | ||
| """Identifies and fuse rotated gates (RX, RY, RZ) from | ||
| a quantum circuit. | ||
| """ | ||
|
|
||
| self.rotated_gates = gates.RX | gates.RY | gates.RZ | ||
| self.gates = [] | ||
|
|
||
| def __call__(self, circuit: Circuit) -> Circuit: | ||
carlos-luque marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Finds and combines RX, RY, and RZ rotation gates in a quantum circuit. | ||
|
|
||
| Args: | ||
| circuit (:class:`qibo.models.circuit.Circuit`): Circuit to have gates identified. | ||
|
|
||
| Returns: | ||
| circuit (:class:`qibo.models.circuit.Circuit`): A new circuit with the | ||
| fuse-rotated gates removed | ||
| """ | ||
|
|
||
| if 0 == circuit.ngates or circuit.gates_of_type(gates.FusedGate): | ||
| return circuit | ||
|
|
||
| tmp_new_circuit = circuit.copy(True) | ||
| self.__merge_rotation_gates(tmp_new_circuit.nqubits, tmp_new_circuit.queue) | ||
|
|
||
| new_circuit = circuit.__class__(**tmp_new_circuit.init_kwargs) | ||
| for gate in self.gates: | ||
| new_circuit.add(gate) | ||
| return new_circuit | ||
|
|
||
| def __merge_rotation_gates(self, number_qubits: int, list_gates: list): | ||
| """Identifies and accumulates rotation angles for | ||
| consecutive rotation gates of the same type. | ||
|
|
||
| Args: | ||
| number_qubits (int): number of qubits. | ||
| list_gates (list): a list of gates (:class:`qibo.gates.abstract.Gate`). | ||
| """ | ||
|
|
||
| previous_gates = [None] * number_qubits | ||
|
|
||
| for gate in list_gates: | ||
|
|
||
| primary_qubit = gate.init_args[0] | ||
|
|
||
| if isinstance(gate, self.rotated_gates): | ||
| prev_gate = previous_gates[primary_qubit] | ||
| if isinstance(prev_gate, gate.__class__): | ||
| tmp_gate = gate.__class__( | ||
| primary_qubit, prev_gate.parameters[0] + gate.parameters[0] | ||
| ) | ||
| previous_gates[primary_qubit] = tmp_gate | ||
| else: | ||
| if isinstance(prev_gate, self.rotated_gates): | ||
| self.gates.append(prev_gate) | ||
| previous_gates[primary_qubit] = gate | ||
| else: | ||
| # Flush stored rotations before adding new gate | ||
| for q in gate.init_args: | ||
| if isinstance(previous_gates[q], self.rotated_gates): | ||
| self.gates.append(previous_gates[q]) | ||
| previous_gates[q] = None | ||
|
|
||
| self.gates.append(gate) | ||
| for q in gate.qubits: | ||
| previous_gates[q] = gate | ||
|
Comment on lines
+229
to
+249
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can I ask you to add some comments here please? because I kind of get the idea but I am getting lost in all these nested checks. Even some more separation, in the sense of adding auxiliary functions with understandable names, would improve readability.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first and second conditions verify whether the gate pair consists of rotation operators of the same type. |
||
|
|
||
| # Append any remaining rotation gates | ||
| self.gates.extend( | ||
| g for g in previous_gates if isinstance(g, self.rotated_gates) | ||
| ) | ||
|
|
||
|
|
||
| class U3GateFusion(Optimizer): | ||
|
|
||
| def __init__(self): | ||
| """Merge pairs of U3 gates in the circuit""" | ||
|
|
||
| self.gates = [] | ||
|
|
||
| def __call__(self, circuit: Circuit) -> Circuit: | ||
| """Optimize the circuit by merging U3 gate pairs. | ||
|
|
||
| Args: | ||
| circuit (:class:`qibo.models.circuit.Circuit`): Circuit to have gates identified. | ||
|
|
||
| Returns: | ||
| circuit (:class:`qibo.models.circuit.Circuit`): A new circuit where pairs of | ||
| U3 gates were merged. | ||
| """ | ||
|
|
||
| if 0 == circuit.ngates or circuit.gates_of_type(gates.FusedGate): | ||
| return circuit | ||
|
|
||
| tmp_new_circuit = circuit.copy(True) | ||
| self.__merge_u3gates(tmp_new_circuit.nqubits, tmp_new_circuit.queue) | ||
|
|
||
| new_circuit = circuit.__class__(**tmp_new_circuit.init_kwargs) | ||
|
|
||
| for gate in self.gates: | ||
| new_circuit.add(gate) | ||
| return new_circuit | ||
|
|
||
| def __merge_u3gates(self, number_qubits: int, list_gates: list): | ||
| """Identifies pairs of U3 gates that can be merged. | ||
|
|
||
| Args: | ||
| number_qubits (int): number of qubits. | ||
| list_gates (list): a list of gates (:class:`qibo.gates.abstract.Gate`). | ||
| """ | ||
|
|
||
| previous_gates = [None] * number_qubits | ||
|
|
||
| for gate in list_gates: | ||
|
|
||
| primary_qubit = gate.init_args[0] # Extract primary qubit | ||
|
|
||
| if isinstance(gate, gates.U3): | ||
| prev_gate = previous_gates[primary_qubit] | ||
| if isinstance(prev_gate, gates.U3): | ||
| tmp_gate = self.__create_u3_fusion(primary_qubit, prev_gate, gate) | ||
| previous_gates[primary_qubit] = tmp_gate | ||
| else: | ||
| previous_gates[primary_qubit] = gate | ||
| else: | ||
| # Flush stored U3 before adding new gate | ||
| for q in gate.init_args: | ||
| if isinstance(previous_gates[q], gates.U3): | ||
| self.gates.append(previous_gates[q]) | ||
| previous_gates[q] = None | ||
|
|
||
| self.gates.append(gate) | ||
| for q in gate.qubits: | ||
| previous_gates[q] = gate # Track current gate | ||
|
|
||
| # Append any remaining U3 gates in one pass | ||
| self.gates.extend(g for g in previous_gates if isinstance(g, gates.U3)) | ||
|
Comment on lines
+287
to
+320
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is basically the same routine of the previous gate merger, I think you could avoid repeating this and write a generic class that searches and merges mergeable gates, naturally the rules of mergeability and search are going to be target specific. Now that I think about it, even the cancellation routine could be seen as a special case of this. class GatesFuser(ABC, Optimizer):
# this will be totally generic
def __call__(self, circuit):
...
# these are object specific
@abstractmethod
def _are_fusable(g1, g2):
pass
@abstractmethod
def fuse(g1, g2):
pass
class InverseCancellation(GatesFuser)
...
class RotationFuser(GatesFuser)
...
class U3Fuser(GatesFuser):
...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The third nested condition verifies whether the preceding gate (prev_gate) is a rotation operator of a different type from the current gate (gate). This check is unnecessary in the case of a U3U3 gate merger. I attempted to modify the routine following the approach proposed for inverse gate cancellation in order to simplify the implementation; however, this resulted in the loss of the original gate ordering within the circuit. A generic class appears to be a promising design choice. However, in the current implementation, the only common generic method is call.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah and that is exactly what you want right? The class GatesFuser(ABC, Optimizer):
def __init__(self):
self.gates = []
@property
@abstractmethod
def gate_type(self) -> Type:
pass
def __call__(self, circuit):
circuit = circuit.copy(True)
nqubits = circuit.nqubits
previous_gates = [None] * nqubits
for gate in circuit.queue:
primary_qubit = gate.init_args[0]
if isinstance(gate, self.gate_type):
prev_gate = previous_gates[primary_qubit]
if self.are_fusable(gate, prev_gate):
tmp_gate = self.fuse(gate, prev_gate)
previous_gates[primary_qubit] = tmp_gate
else:
self.not_fusable_post_processing(prev_gate)
previous_gates[primary_qubit] = gate
else:
# Flush stored gates before adding new gate
for q in gate.init_args:
if isinstance(previous_gates[q], self.gate_type):
self.gates.append(previous_gates[q])
previous_gates[q] = None
self.gates.append(gate)
for q in gate.qubits:
previous_gates[q] = gate
# Append any remaining gates in one pass
self.gates.extend(g for g in previous_gates if isinstance(g, self.gate_type))
new_circuit = circuit.__class__(**circuit.init_kwargs)
for gate in self.gates:
new_circuit.add(gate)
return new_circuit
@abstractmethod
def are_fusable(self, g1, g2):
pass
@abstractmethod
def fuse(self, g1, g2):
pass
def not_fusable_post_processing(self, previous_gate):
pass
class RotationFuser(GatesFuser):
def gate_type(self) -> Type:
return gates.RX | gates.RY | gates.RZ
def are_fusable(self, g1, g2):
return isinstance(g1, g2.__class__)
def fuse(self, g1, g2):
return g1.__class__(
g1.init_args[0], g1.parameters[0] + g2[0]
)
def not_fusable_post_processing(self, previous_gate):
if isinstance(previous_gate, self.gate_type):
self.gates.append(previous_gate)
class U3Fuser(GatesFuser):
def gate_type(self) -> Type:
return gates.U3
def are_fusable(self, g1, g2):
return isinstance(g2, gates.U3)
def fuse(self, g1, g2):
qubit = g1.init_args[0]
# your _create_u3_fusion
...
@staticmethod
def _extract_u3_params(unitary_matrix: np.ndarray):
...This is just a quick draft that I made, there surely are improvements to be made in several points, but roughly should give you the idea. The inverse cancellation can be similarly adapted to fit in this design with, in case, some further tuning of the abstract class I believe. I would personally route for this solution (or something similar) due to it's way better maintainability, readability and flexibility. |
||
|
|
||
| @staticmethod | ||
| def __extract_u3_params(unitary_matrix: np.ndarray): | ||
| """Extracts the theta, phi, and lambda parameters from a fused U3 unitary matrix. | ||
|
|
||
| Args: | ||
| unitary_matrix (np.ndarray): a unitary matrix. | ||
| """ | ||
|
|
||
| theta_r = 2 * np.arccos( | ||
| np.sqrt(np.abs(unitary_matrix[0, 0] * unitary_matrix[1, 1])) | ||
| ) | ||
| sin_r = np.sin(theta_r / 2) | ||
| cos_r = np.cos(theta_r / 2) | ||
|
|
||
| if 0 == cos_r: | ||
| lambda_r = -1j * np.log(-1 * unitary_matrix[0, 1]) | ||
| phi_r = -1j * np.log(unitary_matrix[1, 0]) | ||
| elif 0 == sin_r: | ||
| lambda_r = -1j * np.log(unitary_matrix[1, 1]) | ||
| phi_r = 1j * np.log(unitary_matrix[0, 0]) | ||
| else: | ||
| phi_r = -1j * np.log( | ||
| (unitary_matrix[1, 0] * unitary_matrix[1, 1]) / (sin_r * cos_r) | ||
| ) | ||
| lambda_r = -1j * np.log( | ||
| (sin_r * unitary_matrix[1, 1]) / (unitary_matrix[1, 0] * cos_r) | ||
| ) | ||
|
|
||
| return theta_r, np.real(phi_r), np.real(lambda_r) | ||
|
|
||
| @staticmethod | ||
| def __create_u3_fusion( | ||
| qubit: int, gate1: gates.Gate, gate2: gates.Gate | ||
| ) -> gates.Gate: | ||
| """Create a U3 gate using two U3 gates. | ||
|
|
||
| Args: | ||
| qubit: Ids of the qubit to apply the gate U3. | ||
| gate1: The first gate (:class:`qibo.gates.abstract.Gate`). | ||
| gate2: The second gate (:class:`qibo.gates.abstract.Gate`). | ||
|
|
||
| Returns: | ||
| :class:`qibo.gates.Gate`: a gate representing a fused U3 unitary matrix. | ||
| """ | ||
|
|
||
| if gate1 is None or gate2 is None: | ||
| raise_error(ValueError, "__create_u3_fusion: a/two gate/s is/are None.") | ||
|
|
||
| if not isinstance(gate1, gates.U3) or not isinstance(gate2, gates.U3): | ||
| raise_error(ValueError, "__create_u3_fusion: a/two gate/s is/are not U3.") | ||
|
|
||
| u_final = np.dot(gate2.matrix(), gate1.matrix()) | ||
| theta, phi, lam = U3GateFusion.__extract_u3_params(u_final) | ||
| return gates.U3(qubit, theta, phi, lam) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if there was an easier (less convoluted and slightly more readable) way to implement this.
Namely, I would probably first identify which gates act on which qubits and build a mapping from the qubit to the index in the queue. Then for each qubit I would scan for gate pairs that cancel out and append them to the list of gates to remove (as you already do). Something like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This proposal is valid in general, but it fails in the presence of N-qubit gates.
For instance, consider a three-qubit circuit with the sequence: CNOT(0,1), X(1), Y(2), CNOT(0,1).
The transformation yields X(1) Y(2), which is incorrect.