-
Notifications
You must be signed in to change notification settings - Fork 13
Async Queue | using signed urls from objects so client can downlaod directly from object store #210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,14 +61,24 @@ def connect(cls): | |
| host, port = cls.get_host_port() | ||
| if not verify_connection(host, port): | ||
| raise ConnectionError(f"Ray is not listening on {host}:{port}") | ||
|
|
||
| logger.info(f"Connecting to Ray at {cls.ray_url}...") | ||
| ray.init(logging_level="error", address=cls.ray_url) | ||
| logger.info("Connected to Ray") | ||
|
|
||
| @classmethod | ||
| def connected(cls) -> bool: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just so you are aware, I profiled this method and the added
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which is fine because connected is only called if theres some critical error in the first place. I think its good to know youre truly connected if you can access the controller. Also makes it so if you run a test script right after brining it up, you wont get the error saying controller does not exist |
||
| return ray.is_initialized() and cls.is_listening() | ||
|
|
||
| connected = ray.is_initialized() and cls.is_listening() | ||
|
|
||
| if connected: | ||
|
|
||
| try: | ||
| ray.get_actor("Controller", namespace="NDIF") | ||
| except: | ||
| return False | ||
| else: | ||
| return True | ||
|
|
||
| return False | ||
|
|
||
|
|
||
| @classmethod | ||
| def reset(cls): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,13 @@ | ||
| from typing import ClassVar | ||
| from pydantic import ConfigDict | ||
| from pydantic import ConfigDict | ||
| from .mixins import ObjectStorageMixin | ||
|
|
||
| from ..providers.objectstore import ObjectStoreProvider | ||
| class BackendResultModel(ObjectStorageMixin): | ||
|
|
||
| model_config = ConfigDict(extra='allow') | ||
| model_config = ConfigDict(extra='allow', validate_assignment=False, frozen=False, arbitrary_types_allowed=True, str_strip_whitespace=False, strict=False) | ||
|
|
||
| _bucket_name: ClassVar[str] = "dev-ndif-results" | ||
| _file_extension: ClassVar[str] = "pt" | ||
|
|
||
| def url(self) -> str: | ||
| return self._url(ObjectStoreProvider.object_store) | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| from multiprocessing import Process | ||
| from src.queue.coordinator import Coordinator | ||
| from src.queue.dispatcher import Dispatcher | ||
|
|
||
| def on_starting(server): | ||
|
|
||
| Process(target=Coordinator.start, daemon=False).start() | ||
| Process(target=Dispatcher.start, daemon=False).start() |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logging is useful for me, so I'm adding it back in