Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/buildAndTestRyzenAI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ jobs:

ninja install
ninja check-aie
ninja check-aie-concurrency
popd

build-quick-setup:
Expand Down
133 changes: 98 additions & 35 deletions python/iron/jit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import numpy as np
import pyxrt as xrt
import shutil
import sys
import traceback
import fcntl
import contextlib
import time

from aie.extras.context import mlir_mod_ctx
from ..utils.xrt import read_insts_binary
Expand All @@ -29,6 +30,66 @@
IRON_CACHE_HOME = os.environ.get("IRON_CACHE_HOME", os.path.expanduser("~/.iron/cache"))


def _cleanup_failed_compilation(cache_dir):
"""Clean up cache directory after failed compilation, preserving the lock file."""
if not os.path.exists(cache_dir):
return

for item in os.listdir(cache_dir):
if item == ".lock":
continue
item_path = os.path.join(cache_dir, item)
if os.path.isfile(item_path):
os.remove(item_path)
elif os.path.isdir(item_path):
shutil.rmtree(item_path)


@contextlib.contextmanager
def file_lock(lock_file_path, timeout_seconds=60):
"""
Context manager for file locking using flock to prevent race conditions.

Args:
lock_file_path (str): Path to the lock file
timeout_seconds (int): Maximum time to wait for lock acquisition in seconds
"""
lock_file = None
try:
# Create lock file if it doesn't exist
os.makedirs(os.path.dirname(lock_file_path), exist_ok=True)
try:
f = os.open(lock_file_path, os.O_CREAT | os.O_EXCL)
os.close(f)
except FileExistsError:
pass # File already exists
lock_file = open(lock_file_path, "a")

# Try to acquire exclusive lock with timeout
start_time = time.time()
while True:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except OSError:
# Lock is held by another process
if time.time() - start_time > timeout_seconds:
raise TimeoutError(
f"Could not acquire lock on {lock_file_path} within {timeout_seconds} seconds"
)
time.sleep(0.1)

yield lock_file

finally:
if lock_file is not None:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
except OSError:
pass # Ignore errors when releasing lock
Copy link
Collaborator

@andrej andrej Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? I think we'd rather know if it fails and why.

From man(2) flock:

Furthermore, the lock is released either by an explicit LOCK_UN operation on any of these duplicate file descriptors, or when all such file descriptors have been closed.

Keeping in mind lock_file is presumably the only file descriptor we have in the current process of the lock file, the explicit unlock here is redundant with the lock_file.close() in the next line, which will also release the lock.

lock_file.close()


class CircularCache:
def __init__(self, max_size):
self.max_size = max_size
Expand Down Expand Up @@ -252,41 +313,43 @@ def decorator(*args, **kwargs):
# Hash of the IR string, ExternalFunction compiler options, and target architecture
module_hash = hash_module(mlir_module, external_kernels, target_arch)
kernel_dir = os.path.join(IRON_CACHE_HOME, f"{module_hash}")
lock_file_path = os.path.join(kernel_dir, ".lock")
mlir_path = os.path.join(kernel_dir, "aie.mlir")

# Ensure cache directory exists
os.makedirs(kernel_dir, exist_ok=True)

# Write MLIR to file if not already cached
inst_filename = "insts.bin"
xclbin_filename = "final.xclbin"
xclbin_path = os.path.join(kernel_dir, xclbin_filename)
inst_path = os.path.join(kernel_dir, inst_filename)

xclbin_exists = os.path.exists(xclbin_path)
inst_exists = os.path.exists(inst_path)

if not use_cache or not xclbin_exists or not inst_exists:
try:
with open(mlir_path, "w", encoding="utf-8") as f:
print(mlir_module, file=f)

# Compile ExternalFunctions from inside the JIT compilation directory
for func in external_kernels:
compile_external_kernel(func, kernel_dir, target_arch)

# Compile the MLIR module
compile_mlir_module(
mlir_module=mlir_module,
insts_path=inst_path,
xclbin_path=xclbin_path,
work_dir=kernel_dir,
)
except Exception as e:
# Clean up cache directory on any compilation failure to avoid any corrupted objects in the cache
if os.path.exists(kernel_dir):
shutil.rmtree(kernel_dir)
raise e
# Use file locking to prevent race conditions when accessing cache directory
with file_lock(lock_file_path):
# Ensure cache directory exists
os.makedirs(kernel_dir, exist_ok=True)

# Write MLIR to file if not already cached
inst_filename = "insts.bin"
xclbin_filename = "final.xclbin"
xclbin_path = os.path.join(kernel_dir, xclbin_filename)
inst_path = os.path.join(kernel_dir, inst_filename)

xclbin_exists = os.path.exists(xclbin_path)
inst_exists = os.path.exists(inst_path)

if not use_cache or not xclbin_exists or not inst_exists:
try:
with open(mlir_path, "w", encoding="utf-8") as f:
print(mlir_module, file=f)

# Compile ExternalFunctions from inside the JIT compilation directory
for func in external_kernels:
compile_external_kernel(func, kernel_dir, target_arch)

# Compile the MLIR module
compile_mlir_module(
mlir_module=mlir_module,
insts_path=inst_path,
xclbin_path=xclbin_path,
work_dir=kernel_dir,
)
except Exception as e:
# Clean up cache directory on any compilation failure to avoid any corrupted objects in the cache
_cleanup_failed_compilation(kernel_dir)
raise e

kernel_name = "MLIR_AIE"
try:
Expand Down
11 changes: 9 additions & 2 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,15 @@ set(TEST_DEPENDS
add_lit_testsuite(check-aie "Running the aie regression tests"
${CMAKE_CURRENT_BINARY_DIR}
DEPENDS ${TEST_DEPENDS}
ARGS "-sv --timeout 600"
ARGS "-sv --timeout 600 --filter-out python-concurrency"
)
set_target_properties(check-aie PROPERTIES FOLDER "Tests")

add_subdirectory(CppTests)
add_lit_testsuite(check-aie-concurrency "Running the aie concurrency tests"
${CMAKE_CURRENT_BINARY_DIR}/python-concurrency
DEPENDS ${TEST_DEPENDS}
ARGS "-sv --timeout 600"
)
set_target_properties(check-aie-concurrency PROPERTIES FOLDER "Tests")

add_subdirectory(CppTests)
3 changes: 3 additions & 0 deletions test/lit.cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ def prepend_path(path):
lit_config.parallelism_groups["board"] = 1
config.parallelism_group = "board"

# Concurrency tests control their own parallelism, so run them serially
lit_config.parallelism_groups["concurrency"] = 1

if "LIT_AVAILABLE_FEATURES" in os.environ:
for feature in os.environ["LIT_AVAILABLE_FEATURES"].split():
config.available_features.add(feature)
168 changes: 168 additions & 0 deletions test/python-concurrency/jit_parallel_compilation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# This file is licensed under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# (c) Copyright 2025 AMD Inc.

# RUN: %run_on_npu1% %pytest %s
# RUN: %run_on_npu2% %pytest %s

import pytest
import numpy as np
import os
import tempfile
import subprocess
import sys


def test_parallel_compilation_subprocess():
"""
Test parallel JIT compilation using subprocesses.
This test spawns multiple processes that compile the same kernel concurrently
to ensure the file locking mechanism works correctly.
"""

# Create a temporary cache directory for this test
with tempfile.TemporaryDirectory() as temp_cache_dir:
# Create a simple test script that does JIT compilation
test_script = """
import sys
import numpy as np
import aie.iron as iron
from aie.iron import ObjectFifo, Program, Runtime, Worker
from aie.iron.placers import SequentialPlacer
from aie.iron.controlflow import range_

@iron.jit(is_placed=False)
def simple_add(input0, input1, output):
if input0.shape != input1.shape:
raise ValueError(f"Input shapes are not equal ({input0.shape} != {input1.shape}).")
if input0.shape != output.shape:
raise ValueError(f"Input and output shapes are not equal ({input0.shape} != {output.shape}).")
if len(np.shape(input0)) != 1:
raise ValueError("Function only supports vectors.")
num_elements = np.size(input0)
n = 16
if num_elements % n != 0:
raise ValueError(f"Number of elements ({num_elements}) must be a multiple of {n}.")
N_div_n = num_elements // n

if input0.dtype != input1.dtype:
raise ValueError(f"Input data types are not the same ({input0.dtype} != {input1.dtype}).")
if input0.dtype != output.dtype:
raise ValueError(f"Input and output data types are not the same ({input0.dtype} != {output.dtype}).")
dtype = input0.dtype

# Define tensor types
tensor_ty = np.ndarray[(num_elements,), np.dtype[dtype]]
tile_ty = np.ndarray[(n,), np.dtype[dtype]]

# AIE-array data movement with object fifos
of_in1 = ObjectFifo(tile_ty, name="in1")
of_in2 = ObjectFifo(tile_ty, name="in2")
of_out = ObjectFifo(tile_ty, name="out")

# Define a task that will run on a compute tile
def core_body(of_in1, of_in2, of_out):
# Number of sub-vector "tile" iterations
for _ in range_(N_div_n):
elem_in1 = of_in1.acquire(1)
elem_in2 = of_in2.acquire(1)
elem_out = of_out.acquire(1)
for i in range_(n):
elem_out[i] = elem_in1[i] + elem_in2[i]
of_in1.release(1)
of_in2.release(1)
of_out.release(1)

# Create a worker to run the task on a compute tile
worker = Worker(core_body, fn_args=[of_in1.cons(), of_in2.cons(), of_out.prod()])

# Runtime operations to move data to/from the AIE-array
rt = Runtime()
with rt.sequence(tensor_ty, tensor_ty, tensor_ty) as (A, B, C):
rt.start(worker)
rt.fill(of_in1.prod(), A)
rt.fill(of_in2.prod(), B)
rt.drain(of_out.cons(), C, wait=True)

# Place program components (assign them resources on the device) and generate an MLIR module
return Program(iron.get_current_device(), rt).resolve_program(SequentialPlacer())

# Test the compilation
try:
num_elements = 16
dtype = np.int32
input0 = iron.randint(1, 100, (num_elements,), dtype=dtype, device="npu")
input1 = iron.randint(1, 100, (num_elements,), dtype=dtype, device="npu")
output = iron.zeros_like(input0)

# This should trigger JIT compilation and cache access
simple_add(input0, input1, output)
print("SUCCESS")
except Exception as e:
print(f"ERROR: {type(e).__name__}: {str(e)}")
sys.exit(1)
"""

# Write the test script to a temporary file
script_path = os.path.join(temp_cache_dir, "test_compilation.py")
with open(script_path, "w") as f:
f.write(test_script)

# Run multiple subprocesses concurrently
num_processes = 5
processes = []

for i in range(num_processes):
env = os.environ.copy()
env["IRON_CACHE_HOME"] = temp_cache_dir
process = subprocess.Popen(
[sys.executable, script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env,
)
processes.append(process)

# Wait for all processes to complete and collect return codes
return_codes = []
process_outputs = []

for i, process in enumerate(processes):
stdout, stderr = process.communicate()
return_codes.append(process.returncode)
process_outputs.append((stdout, stderr))

# Print output for each process for debugging
print(f"\n=== Process {i} (return code: {process.returncode}) ===")
print(f"STDOUT:\n{stdout}")
print(f"STDERR:\n{stderr}")
print("=" * 50)

# Count successful processes (return code 0)
successful_processes = sum(1 for code in return_codes if code == 0)

# Verify that all processes completed
assert len(return_codes) == num_processes, "All processes should complete"

# Check if any concurrent compilation failed
if successful_processes < num_processes:
# Create detailed error message with all process outputs
error_msg = (
f"Only {successful_processes}/{num_processes} processes succeeded\n\n"
)
error_msg += "Process details:\n"

for i, (return_code, (stdout, stderr)) in enumerate(
zip(return_codes, process_outputs)
):
status = "SUCCESS" if return_code == 0 else "FAILED"
error_msg += f"\nProcess {i}: {status} (return code: {return_code})\n"
if stdout:
error_msg += f" STDOUT: {stdout.strip()}\n"
if stderr:
error_msg += f" STDERR: {stderr.strip()}\n"

pytest.fail(error_msg)
13 changes: 13 additions & 0 deletions test/python-concurrency/lit.local.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# This file is licensed under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# (c) Copyright 2025 AMD Inc.

# Concurrency tests need to control their own parallelism, so we disable
# inter-test concurrency for this directory
config.parallelism_group = "concurrency"

# Add pytest substitution
config.substitutions.append(("%pytest", "pytest -rA"))

Loading