Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
from urllib.parse import urlparse
import zipfile

from requests import get
from requests import session
from requests.auth import HTTPBasicAuth

from elyra.pipeline.catalog_connector import AirflowEntryData
from elyra.pipeline.catalog_connector import ComponentCatalogConnector
from elyra.pipeline.catalog_connector import EntryData
from elyra.util.url import FileTransportAdapter


class AirflowPackageCatalogConnector(ComponentCatalogConnector):
Expand Down Expand Up @@ -74,20 +75,22 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str
)
return operator_key_list

# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. Airflow connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return operator_key_list
else:
auth = None
pr = urlparse(airflow_package_download_url)
auth = None

if pr.scheme != "file":
# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. Airflow connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return operator_key_list

# tmp_archive_dir is used to store the downloaded archive and as working directory
if hasattr(self, "tmp_archive_dir"):
Expand All @@ -100,7 +103,10 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str

# download archive; abort after 30 seconds
try:
response = get(
requests_session = session()
if pr.scheme == "file":
requests_session.mount("file://", FileTransportAdapter())
response = requests_session.get(
airflow_package_download_url,
timeout=AirflowPackageCatalogConnector.REQUEST_TIMEOUT,
allow_redirects=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
from urllib.parse import urlparse
import zipfile

from requests import get
from requests import session
from requests.auth import HTTPBasicAuth

from elyra.pipeline.catalog_connector import AirflowEntryData
from elyra.pipeline.catalog_connector import ComponentCatalogConnector
from elyra.pipeline.catalog_connector import EntryData
from elyra.util.url import FileTransportAdapter


class AirflowProviderPackageCatalogConnector(ComponentCatalogConnector):
Expand Down Expand Up @@ -79,20 +80,22 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str
)
return operator_key_list

# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. Airflow provider package connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return operator_key_list
else:
auth = None
pr = urlparse(airflow_provider_package_download_url)
auth = None

if pr.scheme != "file":
# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. Airflow provider package connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return operator_key_list

# tmp_archive_dir is used to store the downloaded archive and as working directory
if hasattr(self, "tmp_archive_dir"):
Expand All @@ -105,7 +108,10 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str

# download archive
try:
response = get(
requests_session = session()
if pr.scheme == "file":
requests_session.mount("file://", FileTransportAdapter())
response = requests_session.get(
airflow_provider_package_download_url,
timeout=AirflowProviderPackageCatalogConnector.REQUEST_TIMEOUT,
allow_redirects=True,
Expand Down
40 changes: 23 additions & 17 deletions elyra/pipeline/catalog_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
from typing import Dict
from typing import List
from typing import Optional
from urllib.parse import urlparse

from deprecation import deprecated
from jupyter_core.paths import ENV_JUPYTER_PATH
from requests import get
from requests import session
from requests.auth import HTTPBasicAuth
from traitlets.config import LoggingConfigurable
from traitlets.traitlets import default
Expand All @@ -40,6 +41,7 @@
from elyra.pipeline.component import Component
from elyra.pipeline.component import ComponentParameter
from elyra.pipeline.runtime_type import RuntimeProcessorType
from elyra.util.url import FileTransportAdapter


class EntryData(object):
Expand Down Expand Up @@ -636,24 +638,28 @@ def get_entry_data(
individual catalog entries
"""
url = catalog_entry_data.get("url")

# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. URL catalog connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return None
else:
auth = None
pr = urlparse(url)
auth = None

if pr.scheme != "file":
# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. URL catalog connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return None

try:
res = get(
requests_session = session()
if pr.scheme == "file":
requests_session.mount("file://", FileTransportAdapter())
res = requests_session.get(
url,
timeout=UrlComponentCatalogConnector.REQUEST_TIMEOUT,
allow_redirects=True,
Expand Down
65 changes: 65 additions & 0 deletions elyra/util/url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Copyright 2018-2022 Elyra Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pathlib import Path
from urllib.request import url2pathname

from requests import Response
from requests.adapters import BaseAdapter


class FileTransportAdapter(BaseAdapter):
"""
File Transport Adapter for the requests library. Use this
adapter to enable the requests library to load a resource
from the 'file' schema using HTTP 'GET'.
"""

def send(self, req, **kwargs):
"""
Return the file specified by the given request
"""

response = Response()
response.request = req
response.connection = self
if isinstance(req.url, bytes):
response.url = req.url.decode("utf-8")
else:
response.url = req.url

if req.method.lower() not in ["get"]:
response.status_code = 405
response.reason = "Method not allowed"
return response

p = Path(url2pathname(req.path_url))
if p.is_dir():
response.status_code = 400
response.reason = "Not a file"
return response
elif not p.is_file():
response.status_code = 404
response.reason = "File not found"
return response

with open(p, "rb") as fh:
response.status_code = 200
response._content = fh.read()

return response

def close(self):
pass