-
Notifications
You must be signed in to change notification settings - Fork 92
New transpiler techniques (gate optmization): U3GateFusion, RotationGateFusion and InverseCancellation #1606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
3c5bec3
fa6c783
45c7899
1c3e1fe
0aa50c6
16b2476
9b38e35
cfa0342
5142c60
372808d
96f55e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| from typing import Optional | ||
|
|
||
| import networkx as nx | ||
| import numpy as np | ||
|
|
||
| from qibo import gates | ||
| from qibo.config import raise_error | ||
|
|
@@ -66,3 +67,261 @@ def __call__(self, circuit: Circuit): | |
| else: | ||
| new.add(fgate) | ||
| return new | ||
|
|
||
|
|
||
| class InverseCancellation(Optimizer): | ||
| """ | ||
| Identifies and removes pairs of adjacent gates from a quantum circuit. | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| self.inverse_gates = ( | ||
| gates.H | gates.Y | gates.Z | gates.X | gates.CNOT | gates.CZ | gates.SWAP | ||
| ) | ||
| self.pos_rm_inv_gate = [] | ||
carlos-luque marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def __call__(self, circuit: Circuit) -> Circuit: | ||
carlos-luque marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self.__find_pos_rm(circuit) | ||
|
|
||
| if 0 == len(self.pos_rm_inv_gate) or circuit.gates_of_type(gates.FusedGate): | ||
| return circuit | ||
|
|
||
| tmp_new_circuit = circuit.copy(True) | ||
|
|
||
| new_circuit = circuit.__class__(**tmp_new_circuit.init_kwargs) | ||
| for i, gate in enumerate(tmp_new_circuit.queue): | ||
| if not (i in self.pos_rm_inv_gate): | ||
| new_circuit.add(gate) | ||
|
|
||
| return new_circuit | ||
|
|
||
| def __find_pos_rm(self, newCircuit: Circuit): | ||
| """ | ||
| Identifies and marks pairs of inverse gates that can be removed from the circuit. | ||
|
|
||
| Args: | ||
| newCircuit: The Circuit. | ||
|
|
||
| """ | ||
renatomello marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| previous_gates = [None] * newCircuit.nqubits | ||
|
|
||
| def update_previous_gates(qubits, idx, gate): | ||
| """Helper function to update previous gate tracking.""" | ||
| for q in qubits: | ||
| previous_gates[q] = (idx, gate) | ||
|
|
||
| def clear_previous_gates(qubits): | ||
| """Helper to clear tracking for given qubits.""" | ||
| for q in qubits: | ||
| previous_gates[q] = None | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| for i, gate in enumerate(newCircuit.queue): | ||
|
|
||
| if not gate.init_args: | ||
| continue | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| primary_qubit, *other_qubits = gate.init_args | ||
|
|
||
| if previous_gates[primary_qubit] is None: | ||
| update_previous_gates(gate.init_args, i, gate) | ||
| else: | ||
| if isinstance(gate, self.inverse_gates): | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| same_gate = self.__sameGates(gate, previous_gates[primary_qubit][1]) | ||
|
|
||
| secondary_match = ( | ||
| all( | ||
| previous_gates[q] is not None | ||
| and self.__sameGates(gate, previous_gates[q][1]) | ||
| for q in other_qubits | ||
| ) | ||
| if other_qubits | ||
| else True | ||
| ) | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if same_gate and secondary_match: | ||
| self.pos_rm_inv_gate.extend( | ||
| [previous_gates[primary_qubit][0], i] | ||
| ) | ||
| clear_previous_gates(gate.init_args) | ||
| else: | ||
| update_previous_gates(gate.init_args, i, gate) | ||
| else: | ||
| update_previous_gates(gate.init_args, i, gate) | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @staticmethod | ||
| def __sameGates(gate1, gate2) -> bool: | ||
| """ | ||
| Determines whether two gates are considered the same. | ||
|
|
||
| Args: | ||
| gate1: The first gate. | ||
| gate2: The second gate. | ||
|
|
||
| Returns: | ||
| True if the gates are the same, otherwise False. | ||
| """ | ||
| if gate1 is None or gate2 is None: | ||
| return False | ||
|
|
||
| return (gate1.name, gate1.init_args, gate1.init_kwargs) == ( | ||
| gate2.name, | ||
| gate2.init_args, | ||
| gate2.init_kwargs, | ||
| ) | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| class RotationGateFusion(Optimizer): | ||
| """ | ||
| Identifies and fuse rotated gates (RX, RY, RZ) from a quantum circuit. | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| self.rotated_gates = gates.RX | gates.RY | gates.RZ | ||
| self.gates = [] | ||
|
|
||
| def __call__(self, circuit: Circuit) -> Circuit: | ||
carlos-luque marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if 0 == circuit.ngates or circuit.gates_of_type(gates.FusedGate): | ||
| return circuit | ||
|
|
||
| tmp_new_circuit = circuit.copy(True) | ||
| self.__find_gates(tmp_new_circuit) | ||
|
|
||
| new_circuit = circuit.__class__(**tmp_new_circuit.init_kwargs) | ||
| for gate in self.gates: | ||
| new_circuit.add(gate) | ||
|
|
||
| return new_circuit | ||
|
|
||
| def __find_gates(self, newCircuit: Circuit): | ||
| """ | ||
| Identifies and accumulates rotation angles for consecutive rotation gates of the same type. | ||
| """ | ||
|
|
||
| previous_gates = [None] * newCircuit.nqubits | ||
|
|
||
| for gate in newCircuit.queue: | ||
|
|
||
| if not gate.qubits: | ||
| continue | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| primary_qubit = gate.init_args[0] | ||
|
|
||
| if isinstance(gate, self.rotated_gates): | ||
| prev_gate = previous_gates[primary_qubit] | ||
| if isinstance(prev_gate, type(gate)): | ||
| tmp_gate = type(gate)( | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| primary_qubit, prev_gate.parameters[0] + gate.parameters[0] | ||
| ) | ||
| previous_gates[primary_qubit] = tmp_gate | ||
| else: | ||
| if isinstance(prev_gate, self.rotated_gates): | ||
| self.gates.append(prev_gate) | ||
| previous_gates[primary_qubit] = gate | ||
| else: | ||
| # Flush stored rotations before adding new gate | ||
| for q in gate.init_args: | ||
| if isinstance(previous_gates[q], self.rotated_gates): | ||
| self.gates.append(previous_gates[q]) | ||
| previous_gates[q] = None | ||
|
|
||
| self.gates.append(gate) | ||
| for q in gate.qubits: | ||
| previous_gates[q] = gate | ||
|
Comment on lines
+229
to
+249
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can I ask you to add some comments here please? because I kind of get the idea but I am getting lost in all these nested checks. Even some more separation, in the sense of adding auxiliary functions with understandable names, would improve readability.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first and second conditions verify whether the gate pair consists of rotation operators of the same type. |
||
|
|
||
| # Append any remaining rotation gates | ||
| self.gates.extend( | ||
| g for g in previous_gates if isinstance(g, self.rotated_gates) | ||
| ) | ||
|
|
||
|
|
||
| class U3GateFusion(Optimizer): | ||
| """Merge pairs of U3 gates in the circuit""" | ||
|
|
||
| def __init__(self): | ||
| self.gates = [] | ||
|
|
||
| def __call__(self, circuit: Circuit) -> Circuit: | ||
|
|
||
| if 0 == circuit.ngates or circuit.gates_of_type(gates.FusedGate): | ||
| return circuit | ||
|
|
||
| tmp_new_circuit = circuit.copy(True) | ||
| self.__find_gates(tmp_new_circuit) | ||
|
|
||
| new_circuit = circuit.__class__(**tmp_new_circuit.init_kwargs) | ||
|
|
||
| for gate in self.gates: | ||
| new_circuit.add(gate) | ||
|
|
||
| return new_circuit | ||
|
|
||
| def __find_gates(self, newCircuit: Circuit) -> Circuit: | ||
| """ | ||
| Identifies pairs of U3 gates that can be merged | ||
| """ | ||
|
|
||
| previous_gates = [None] * newCircuit.nqubits | ||
|
|
||
| for gate in newCircuit.queue: | ||
|
|
||
| if not gate.qubits: | ||
| continue | ||
|
|
||
| primary_qubit = gate.init_args[0] # Extract primary qubit | ||
|
|
||
| if isinstance(gate, gates.U3): | ||
| prev_gate = previous_gates[primary_qubit] | ||
| if isinstance(prev_gate, gates.U3): | ||
| previous_gates[primary_qubit] = self.__U3Fusion( | ||
| primary_qubit, prev_gate, gate | ||
| ) | ||
| else: | ||
| previous_gates[primary_qubit] = gate | ||
| else: | ||
| # Flush stored U3 before adding new gate | ||
| for q in gate.init_args: | ||
| if isinstance(previous_gates[q], gates.U3): | ||
| self.gates.append(previous_gates[q]) | ||
| previous_gates[q] = None | ||
|
|
||
| self.gates.append(gate) | ||
| for q in gate.qubits: | ||
| previous_gates[q] = gate # Track current gate | ||
|
|
||
| # Append any remaining U3 gates in one pass | ||
| self.gates.extend(g for g in previous_gates if isinstance(g, gates.U3)) | ||
|
Comment on lines
+287
to
+320
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is basically the same routine of the previous gate merger, I think you could avoid repeating this and write a generic class that searches and merges mergeable gates, naturally the rules of mergeability and search are going to be target specific. Now that I think about it, even the cancellation routine could be seen as a special case of this. class GatesFuser(ABC, Optimizer):
# this will be totally generic
def __call__(self, circuit):
...
# these are object specific
@abstractmethod
def _are_fusable(g1, g2):
pass
@abstractmethod
def fuse(g1, g2):
pass
class InverseCancellation(GatesFuser)
...
class RotationFuser(GatesFuser)
...
class U3Fuser(GatesFuser):
...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The third nested condition verifies whether the preceding gate (prev_gate) is a rotation operator of a different type from the current gate (gate). This check is unnecessary in the case of a U3U3 gate merger. I attempted to modify the routine following the approach proposed for inverse gate cancellation in order to simplify the implementation; however, this resulted in the loss of the original gate ordering within the circuit. A generic class appears to be a promising design choice. However, in the current implementation, the only common generic method is call.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah and that is exactly what you want right? The class GatesFuser(ABC, Optimizer):
def __init__(self):
self.gates = []
@property
@abstractmethod
def gate_type(self) -> Type:
pass
def __call__(self, circuit):
circuit = circuit.copy(True)
nqubits = circuit.nqubits
previous_gates = [None] * nqubits
for gate in circuit.queue:
primary_qubit = gate.init_args[0]
if isinstance(gate, self.gate_type):
prev_gate = previous_gates[primary_qubit]
if self.are_fusable(gate, prev_gate):
tmp_gate = self.fuse(gate, prev_gate)
previous_gates[primary_qubit] = tmp_gate
else:
self.not_fusable_post_processing(prev_gate)
previous_gates[primary_qubit] = gate
else:
# Flush stored gates before adding new gate
for q in gate.init_args:
if isinstance(previous_gates[q], self.gate_type):
self.gates.append(previous_gates[q])
previous_gates[q] = None
self.gates.append(gate)
for q in gate.qubits:
previous_gates[q] = gate
# Append any remaining gates in one pass
self.gates.extend(g for g in previous_gates if isinstance(g, self.gate_type))
new_circuit = circuit.__class__(**circuit.init_kwargs)
for gate in self.gates:
new_circuit.add(gate)
return new_circuit
@abstractmethod
def are_fusable(self, g1, g2):
pass
@abstractmethod
def fuse(self, g1, g2):
pass
def not_fusable_post_processing(self, previous_gate):
pass
class RotationFuser(GatesFuser):
def gate_type(self) -> Type:
return gates.RX | gates.RY | gates.RZ
def are_fusable(self, g1, g2):
return isinstance(g1, g2.__class__)
def fuse(self, g1, g2):
return g1.__class__(
g1.init_args[0], g1.parameters[0] + g2[0]
)
def not_fusable_post_processing(self, previous_gate):
if isinstance(previous_gate, self.gate_type):
self.gates.append(previous_gate)
class U3Fuser(GatesFuser):
def gate_type(self) -> Type:
return gates.U3
def are_fusable(self, g1, g2):
return isinstance(g2, gates.U3)
def fuse(self, g1, g2):
qubit = g1.init_args[0]
# your _create_u3_fusion
...
@staticmethod
def _extract_u3_params(unitary_matrix: np.ndarray):
...This is just a quick draft that I made, there surely are improvements to be made in several points, but roughly should give you the idea. The inverse cancellation can be similarly adapted to fit in this design with, in case, some further tuning of the abstract class I believe. I would personally route for this solution (or something similar) due to it's way better maintainability, readability and flexibility. |
||
|
|
||
| @staticmethod | ||
| def __extract_u3_params(U): | ||
carlos-luque marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Extracts the theta, phi, and lambda parameters from a fused U3 unitary matrix.""" | ||
|
|
||
| theta_f = 2 * np.arccos(np.sqrt(np.abs(U[0, 0] * U[1, 1]))) | ||
| if 0 == np.cos(theta_f / 2): | ||
| lambda_f = np.real(-1j * np.log(-1 * U[0, 1])) | ||
| phi_f = np.real(-1j * np.log(U[1, 0])) | ||
| elif 0 == np.sin(theta_f / 2): | ||
| lambda_f = np.real(-1j * np.log(U[1, 1])) | ||
| phi_f = np.real(1j * np.log(U[0, 0])) | ||
| else: | ||
| phi_f = np.real( | ||
| -1j | ||
| * np.log( | ||
| (U[1, 0] * U[1, 1]) / (np.sin(theta_f / 2) * np.cos(theta_f / 2)) | ||
| ) | ||
| ) | ||
| lambda_f = np.real( | ||
| -1j | ||
| * np.log( | ||
| (np.sin(theta_f / 2) * U[1, 1]) / (U[1, 0] * np.cos(theta_f / 2)) | ||
| ) | ||
| ) | ||
|
|
||
| return theta_f, phi_f, lambda_f | ||
|
|
||
| @staticmethod | ||
| def __U3Fusion(qubit, gate1, gate2) -> gates: | ||
| u_final = np.dot(gate2.matrix(), gate1.matrix()) | ||
| theta, phi, lam = U3GateFusion.__extract_u3_params(u_final) | ||
| return gates.U3(qubit, theta, phi, lam) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if there was an easier (less convoluted and slightly more readable) way to implement this.
Namely, I would probably first identify which gates act on which qubits and build a mapping from the qubit to the index in the queue. Then for each qubit I would scan for gate pairs that cancel out and append them to the list of gates to remove (as you already do). Something like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This proposal is valid in general, but it fails in the presence of N-qubit gates.
For instance, consider a three-qubit circuit with the sequence: CNOT(0,1), X(1), Y(2), CNOT(0,1).
The transformation yields X(1) Y(2), which is incorrect.