Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 88 additions & 24 deletions ld_openfeature/provider.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import threading
from typing import Any, List, Optional, Union

from ldclient import LDClient, Config
from ldclient.interfaces import DataSourceStatus, FlagChange, DataSourceState
from openfeature.evaluation_context import EvaluationContext
from openfeature.exception import ErrorCode
from openfeature.exception import ErrorCode, ProviderFatalError
from openfeature.flag_evaluation import FlagResolutionDetails, FlagType, Reason
from openfeature.hook import Hook
from openfeature.provider.metadata import Metadata
from openfeature.provider.provider import AbstractProvider
from openfeature.provider import AbstractProvider
from openfeature.event import ProviderEventDetails

from ld_openfeature.impl.context_converter import EvaluationContextConverter
from ld_openfeature.impl.details_converter import ResolutionDetailsConverter
Expand All @@ -19,7 +22,60 @@ def __init__(self, config: Config):
self.__context_converter = EvaluationContextConverter()
self.__details_converter = ResolutionDetailsConverter()

def __handle_data_source_status(self, status: DataSourceStatus):
state = status.state
if state == DataSourceState.INITIALIZING:
return
elif state == DataSourceState.VALID:
self.emit_provider_ready(ProviderEventDetails())
elif state == DataSourceState.OFF:
error_message = self.__get_message(status,
"the provider has encountered a permanent error or has been shutdown")
self.emit_provider_error(ProviderEventDetails(error_code=ErrorCode.PROVIDER_FATAL,
message=error_message))
elif state == DataSourceState.INTERRUPTED:
error_message = self.__get_message(status, "encountered an unknown error")
self.emit_provider_stale(ProviderEventDetails(message=error_message))

# For now treat an unknown state as no change.

def __handle_flag_change(self, change: FlagChange):
self.emit_provider_configuration_changed(ProviderEventDetails(flags_changed=[change.key]))
pass

def initialize(self, evaluation_context: EvaluationContext):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should handle the cases where the client failed or succeeded within the start_wait period, or where it exceeded that period and the data source later becomes VALID or OFF.

ready_event = threading.Event()

def ready_handler(status: DataSourceStatus):
if status.state == DataSourceState.VALID:
ready_event.set()
elif status.state == DataSourceState.OFF:
ready_event.set()

# We listen just to handle the ready event. We do not emit events because the client emits them for us.
self.__client.data_source_status_provider.add_listener(ready_handler)

# Check for conditions that may have happened before we added the listener.
if self.__client.data_source_status_provider.status.state == DataSourceState.OFF:
ready_event.set()

if self.__client.is_initialized():
ready_event.set()

ready_event.wait()

self.__client.data_source_status_provider.remove_listener(ready_handler)

if not self.__client.is_initialized():
raise ProviderFatalError(error_message="launchdarkly client initialization failed")

# Listen to new status events and emit them.
self.__client.data_source_status_provider.add_listener(self.__handle_data_source_status)
self.__client.flag_tracker.add_listener(self.__handle_flag_change)

def shutdown(self):
self.__client.data_source_status_provider.remove_listener(self.__handle_data_source_status)
self.__client.flag_tracker.remove_listener(self.__handle_flag_change)
self.__client.close()

def get_metadata(self) -> Metadata:
Expand All @@ -29,51 +85,52 @@ def get_provider_hooks(self) -> List[Hook]:
return []

def resolve_boolean_details(
self,
flag_key: str,
default_value: bool,
evaluation_context: Optional[EvaluationContext] = None,
self,
flag_key: str,
default_value: bool,
evaluation_context: Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[bool]:
"""Resolves the flag value for the provided flag key as a boolean"""
return self.__resolve_value(FlagType(FlagType.BOOLEAN), flag_key, default_value, evaluation_context)

def resolve_string_details(
self,
flag_key: str,
default_value: str,
evaluation_context: Optional[EvaluationContext] = None,
self,
flag_key: str,
default_value: str,
evaluation_context: Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[str]:
"""Resolves the flag value for the provided flag key as a string"""
return self.__resolve_value(FlagType(FlagType.STRING), flag_key, default_value, evaluation_context)

def resolve_integer_details(
self,
flag_key: str,
default_value: int,
evaluation_context: Optional[EvaluationContext] = None,
self,
flag_key: str,
default_value: int,
evaluation_context: Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[int]:
"""Resolves the flag value for the provided flag key as a integer"""
return self.__resolve_value(FlagType(FlagType.INTEGER), flag_key, default_value, evaluation_context)

def resolve_float_details(
self,
flag_key: str,
default_value: float,
evaluation_context: Optional[EvaluationContext] = None,
self,
flag_key: str,
default_value: float,
evaluation_context: Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[float]:
"""Resolves the flag value for the provided flag key as a float"""
return self.__resolve_value(FlagType(FlagType.FLOAT), flag_key, default_value, evaluation_context)

def resolve_object_details(
self,
flag_key: str,
default_value: Union[dict, list],
evaluation_context: Optional[EvaluationContext] = None,
self,
flag_key: str,
default_value: Union[dict, list],
evaluation_context: Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[Union[dict, list]]:
"""Resolves the flag value for the provided flag key as a list or dictionary"""
return self.__resolve_value(FlagType(FlagType.OBJECT), flag_key, default_value, evaluation_context)

def __resolve_value(self, flag_type: FlagType, flag_key: str, default_value: Any, evaluation_context: Optional[EvaluationContext] = None) -> FlagResolutionDetails:
def __resolve_value(self, flag_type: FlagType, flag_key: str, default_value: Any,
evaluation_context: Optional[EvaluationContext] = None) -> FlagResolutionDetails:
if evaluation_context is None:
return FlagResolutionDetails(
value=default_value,
Expand Down Expand Up @@ -103,9 +160,16 @@ def __resolve_value(self, flag_type: FlagType, flag_key: str, default_value: Any

return self.__details_converter.to_resolution_details(result)

def __mismatched_type_details(self, default_value: Any) -> FlagResolutionDetails:
@staticmethod
def __mismatched_type_details(default_value: Any) -> FlagResolutionDetails:
return FlagResolutionDetails(
value=default_value,
reason=Reason(Reason.ERROR),
error_code=ErrorCode.TYPE_MISMATCH
)

@staticmethod
def __get_message(status: DataSourceStatus, default: str):
if status.error and status.error.message:
return status.error.message
return default
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ packages = [

[tool.poetry.dependencies]
python = "^3.8"
openfeature-sdk = ">=0.4.2,<1"
openfeature-sdk = ">=0.7.0,<1"
launchdarkly-server-sdk = "<10"


Expand Down
134 changes: 134 additions & 0 deletions tests/test_data_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import threading
import time
from typing import Optional

from ldclient import Config
from ldclient.integrations.test_data import TestData
from ldclient.interfaces import UpdateProcessor, DataSourceUpdateSink, DataSourceState, DataSourceErrorInfo, \
DataSourceErrorKind
from ldclient.versioned_data_kind import FEATURES


class FailingDataSource(UpdateProcessor):
def __init__(self, config: Config, store, ready: threading.Event):
self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink
self._ready = ready

def start(self):
if self._data_source_update_sink is None:
return

self._ready.set()

self._data_source_update_sink.update_status(
DataSourceState.OFF,
DataSourceErrorInfo(
DataSourceErrorKind.ERROR_RESPONSE,
401,
time.time(),
str("Bad things")
)
)

def stop(self):
pass

def is_alive(self):
return False

def initialized(self):
return False


class DelayedFailingDataSource(UpdateProcessor):
def __init__(self, config: Config, store, ready: threading.Event):
self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink
self._ready = ready

def start(self):
if self._data_source_update_sink is None:
return

self._ready.set()

def data_source_failure():
self._data_source_update_sink.update_status(
DataSourceState.OFF,
DataSourceErrorInfo(
DataSourceErrorKind.ERROR_RESPONSE,
401,
time.time(),
str("Bad things")
)
)

threading.Timer(0.1, data_source_failure).start()

def stop(self):
pass

def is_alive(self):
return False

def initialized(self):
return False


class StaleDataSource(UpdateProcessor):
def __init__(self, config: Config, store, ready: threading.Event):
self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink
self._ready = ready

def start(self):
self._ready.set()
self._data_source_update_sink.update_status(DataSourceState.VALID, None)

def data_source_interrupted():
self._data_source_update_sink.update_status(
DataSourceState.INTERRUPTED,
DataSourceErrorInfo(
DataSourceErrorKind.ERROR_RESPONSE,
408,
time.time(),
str("Less bad things")
)
)

threading.Timer(0.1, data_source_interrupted).start()

def stop(self):
pass

def is_alive(self):
return False

def initialized(self):
return True


class UpdatingDataSource(UpdateProcessor):
def __init__(self, config: Config, store, ready: threading.Event):
self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink
self._ready = ready

def start(self):
self._ready.set()
self._data_source_update_sink.init({})
self._data_source_update_sink.update_status(DataSourceState.VALID, None)

def update_data():
# The test_data_source is only used to access the flag builder.
# We call _build here, once TestData supports change handlers we should remove this.
self._data_source_update_sink.upsert(FEATURES,
TestData().data_source().flag("potato").on(True)._build(1))

threading.Timer(0.1, update_data).start()

def stop(self):
pass

def is_alive(self):
return False

def initialized(self):
return True
Loading