Skip to content

Commit de9148b

Browse files
committed
fix: handle big parameters of failed step
fix: add sub_path and slice to download_artifact fix: data persistent for minio Signed-off-by: zjgemi <[email protected]>
1 parent 8ef468a commit de9148b

File tree

4 files changed

+74
-41
lines changed

4 files changed

+74
-41
lines changed

manifests/quick-start-postgres-stable-cn.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1692,6 +1692,9 @@ spec:
16921692
- mkdir
16931693
- -p
16941694
- /data/my-bucket
1695+
volumeMounts:
1696+
- mountPath: /data
1697+
name: minio-data
16951698
livenessProbe:
16961699
httpGet:
16971700
path: /minio/health/live
@@ -1710,6 +1713,11 @@ spec:
17101713
port: 9000
17111714
initialDelaySeconds: 5
17121715
periodSeconds: 10
1716+
volumes:
1717+
- hostPath:
1718+
path: /data/minio
1719+
type: DirectoryOrCreate
1720+
name: minio-data
17131721
---
17141722
apiVersion: apps/v1
17151723
kind: Deployment

manifests/quick-start-postgres.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1692,6 +1692,9 @@ spec:
16921692
- mkdir
16931693
- -p
16941694
- /data/my-bucket
1695+
volumeMounts:
1696+
- mountPath: /data
1697+
name: minio-data
16951698
livenessProbe:
16961699
httpGet:
16971700
path: /minio/health/live
@@ -1710,6 +1713,11 @@ spec:
17101713
port: 9000
17111714
initialDelaySeconds: 5
17121715
periodSeconds: 10
1716+
volumes:
1717+
- hostPath:
1718+
path: /data/minio
1719+
type: DirectoryOrCreate
1720+
name: minio-data
17131721
---
17141722
apiVersion: apps/v1
17151723
kind: Deployment

src/dflow/argo_objects.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,26 @@ def handle_big_parameters(self, io):
106106
if name[13:] in io.parameters:
107107
continue
108108
with tempfile.TemporaryDirectory() as tmpdir:
109-
download_artifact(art, path=tmpdir)
110-
fs = os.listdir(tmpdir)
111-
assert len(fs) == 1
112-
with open(os.path.join(tmpdir, fs[0]), "r") as f:
113-
content = jsonpickle.loads(f.read())
114-
param = {"name": name[13:],
115-
"save_as_artifact": True}
116-
if "type" in content:
117-
param["type"] = content["type"]
118-
if "type" in content and content["type"] != \
119-
str(str):
120-
param["value"] = jsonpickle.loads(
121-
content["value"])
122-
else:
123-
param["value"] = content["value"]
124-
io.parameters[name[13:]] = ArgoObjectDict(param)
109+
try:
110+
download_artifact(art, path=tmpdir)
111+
fs = os.listdir(tmpdir)
112+
assert len(fs) == 1
113+
with open(os.path.join(tmpdir, fs[0]), "r") as f:
114+
content = jsonpickle.loads(f.read())
115+
param = {"name": name[13:],
116+
"save_as_artifact": True}
117+
if "type" in content:
118+
param["type"] = content["type"]
119+
if "type" in content and content["type"] != \
120+
str(str):
121+
param["value"] = jsonpickle.loads(
122+
content["value"])
123+
else:
124+
param["value"] = content["value"]
125+
io.parameters[name[13:]
126+
] = ArgoObjectDict(param)
127+
except Exception:
128+
pass
125129

126130
def modify_output_parameter(
127131
self,

src/dflow/utils.py

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
def download_artifact(
3636
artifact,
3737
extract: bool = True,
38+
sub_path: str = None,
39+
slice: int = None,
3840
**kwargs,
3941
) -> List[str]:
4042
"""
@@ -51,34 +53,45 @@ def download_artifact(
5153
bucket_name: bucket name for Minio
5254
"""
5355
if hasattr(artifact, "s3"):
54-
if hasattr(artifact, "archive") and hasattr(artifact.archive, "none")\
55-
and artifact.archive.none is not None:
56-
path = download_s3(key=artifact.s3.key, recursive=True, **kwargs)
57-
else:
58-
path = download_s3(key=artifact.s3.key, recursive=False, **kwargs)
59-
if path[-4:] == ".tgz" and extract:
60-
tf = tarfile.open(path, "r:gz")
61-
with tempfile.TemporaryDirectory() as tmpdir:
62-
tf.extractall(tmpdir)
63-
tf.close()
64-
65-
os.remove(path)
66-
path = os.path.dirname(path)
67-
68-
# if the artifact contains only one directory, merge the
69-
# directory with the target directory
70-
ld = os.listdir(tmpdir)
71-
if len(ld) == 1 and os.path.isdir(os.path.join(tmpdir,
72-
ld[0])):
73-
merge_dir(os.path.join(tmpdir, ld[0]), path)
74-
else:
75-
merge_dir(tmpdir, path)
76-
77-
remove_empty_dir_tag(path)
78-
return assemble_path_list(path, remove=True)
56+
key = artifact.s3.key
57+
elif hasattr(artifact, "key"):
58+
key = artifact.key
7959
else:
8060
raise NotImplementedError()
8161

62+
if slice is not None:
63+
sub_path = path_list_of_artifact(artifact)[slice]
64+
65+
if sub_path is not None:
66+
key = key + "/" + sub_path
67+
if "path" in kwargs:
68+
kwargs["path"] = os.path.join(kwargs["path"],
69+
os.path.dirname(sub_path))
70+
else:
71+
kwargs["path"] = os.path.join(".", os.path.dirname(sub_path))
72+
73+
path = download_s3(key=key, recursive=True, **kwargs)
74+
if key[-4:] == ".tgz" and extract:
75+
path = os.path.join(path, os.path.basename(key))
76+
tf = tarfile.open(path, "r:gz")
77+
with tempfile.TemporaryDirectory() as tmpdir:
78+
tf.extractall(tmpdir)
79+
tf.close()
80+
81+
os.remove(path)
82+
path = os.path.dirname(path)
83+
84+
# if the artifact contains only one directory, merge the
85+
# directory with the target directory
86+
ld = os.listdir(tmpdir)
87+
if len(ld) == 1 and os.path.isdir(os.path.join(tmpdir, ld[0])):
88+
merge_dir(os.path.join(tmpdir, ld[0]), path)
89+
else:
90+
merge_dir(tmpdir, path)
91+
92+
remove_empty_dir_tag(path)
93+
return assemble_path_list(path, remove=True)
94+
8295

8396
def upload_artifact(
8497
path: Union[os.PathLike, List[os.PathLike], Set[os.PathLike]],

0 commit comments

Comments
 (0)