Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Resource Types:
<h3 id="duck.knative.dev/v1.AppliedEventPoliciesStatus">AppliedEventPoliciesStatus
</h3>
<p>
(<em>Appears on:</em><a href="#duck.knative.dev/v1.ChannelableStatus">ChannelableStatus</a>, <a href="#eventing.knative.dev/v1.BrokerStatus">BrokerStatus</a>, <a href="#eventing.knative.dev/v1alpha1.RequestReplyStatus">RequestReplyStatus</a>, <a href="#flows.knative.dev/v1.ParallelStatus">ParallelStatus</a>, <a href="#flows.knative.dev/v1.SequenceStatus">SequenceStatus</a>, <a href="#sinks.knative.dev/v1alpha1.IntegrationSinkStatus">IntegrationSinkStatus</a>, <a href="#sinks.knative.dev/v1alpha1.JobSinkStatus">JobSinkStatus</a>)
(<em>Appears on:</em><a href="#duck.knative.dev/v1.ChannelableStatus">ChannelableStatus</a>, <a href="#eventing.knative.dev/v1.BrokerStatus">BrokerStatus</a>, <a href="#eventing.knative.dev/v1alpha1.EventTransformStatus">EventTransformStatus</a>, <a href="#eventing.knative.dev/v1alpha1.RequestReplyStatus">RequestReplyStatus</a>, <a href="#flows.knative.dev/v1.ParallelStatus">ParallelStatus</a>, <a href="#flows.knative.dev/v1.SequenceStatus">SequenceStatus</a>, <a href="#sinks.knative.dev/v1alpha1.IntegrationSinkStatus">IntegrationSinkStatus</a>, <a href="#sinks.knative.dev/v1alpha1.JobSinkStatus">JobSinkStatus</a>)
</p>
<p>
<p>AppliedEventPoliciesStatus contains the list of policies which apply to a resource.
Expand Down Expand Up @@ -3515,6 +3515,23 @@ It exposes the endpoint as an URI to get events delivered.</p>
</tr>
<tr>
<td>
<code>AppliedEventPoliciesStatus</code><br/>
<em>
<a href="#duck.knative.dev/v1.AppliedEventPoliciesStatus">
AppliedEventPoliciesStatus
</a>
</em>
</td>
<td>
<p>
(Members of <code>AppliedEventPoliciesStatus</code> are embedded into this type.)
</p>
<em>(Optional)</em>
<p>AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this EventTransform.</p>
</td>
</tr>
<tr>
<td>
<code>jsonata</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.JsonataEventTransformationStatus">
Expand Down
22 changes: 20 additions & 2 deletions pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
)

const (
TransformConditionAddressable apis.ConditionType = "Addressable"
TransformationConditionReady apis.ConditionType = "TransformationReady"
TransformConditionAddressable apis.ConditionType = "Addressable"
TransformationConditionReady apis.ConditionType = "TransformationReady"
TransformationEventPoliciesReady apis.ConditionType = "EventPoliciesReady"

TransformationAddressableEmptyURL string = "NoURL"
TransformationAddressableWaitingForServiceEndpoints string = "WaitingForServiceEndpoints"
Expand All @@ -48,6 +49,7 @@ const (
var TransformCondSet = apis.NewLivingConditionSet(
TransformationConditionReady,
TransformConditionAddressable,
TransformationEventPoliciesReady,
)

// transformJsonataConditionSet is the subset of conditions for the Jsonata transformation
Expand Down Expand Up @@ -211,3 +213,19 @@ func (ts *EventTransformStatus) SetAddresses(addresses ...duckv1.Addressable) {
}
ts.GetConditionSet().Manage(ts).MarkTrue(TransformConditionAddressable)
}

func (ts *EventTransformStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
TransformCondSet.Manage(ts).MarkFalse(TransformationEventPoliciesReady, reason, messageFormat, messageA...)
}

func (ts *EventTransformStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
TransformCondSet.Manage(ts).MarkUnknown(TransformationEventPoliciesReady, reason, messageFormat, messageA...)
}

func (ts *EventTransformStatus) MarkEventPoliciesTrue() {
TransformCondSet.Manage(ts).MarkTrue(TransformationEventPoliciesReady)
}

func (ts *EventTransformStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
TransformCondSet.Manage(ts).MarkTrueWithReason(TransformationEventPoliciesReady, reason, messageFormat, messageA...)
}
15 changes: 10 additions & 5 deletions pkg/apis/eventing/v1alpha1/eventtransform_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ func TestFullLifecycle(t *testing.T) {
topLevel := et.Status.GetCondition(apis.ConditionReady)
transformation := et.Status.GetCondition(TransformationConditionReady)
addressable := et.Status.GetCondition(TransformConditionAddressable)
eventPolicies := et.Status.GetCondition(TransformationEventPoliciesReady)

assert.Equal(t, corev1.ConditionUnknown, topLevel.Status)
assert.Equal(t, corev1.ConditionUnknown, transformation.Status)
assert.Equal(t, corev1.ConditionUnknown, addressable.Status)
assert.Len(t, et.Status.Conditions, 3)
assert.Equal(t, corev1.ConditionUnknown, eventPolicies.Status)
assert.Len(t, et.Status.Conditions, 4)
assert.Equal(t, false, et.Status.IsReady())

ds := appsv1.DeploymentStatus{
Expand All @@ -75,26 +77,29 @@ func TestFullLifecycle(t *testing.T) {

deploymentCondition := et.Status.GetCondition(TransformationJsonataDeploymentReady)
assert.Equal(t, corev1.ConditionTrue, deploymentCondition.Status)
assert.Len(t, et.Status.Conditions, 4)
assert.Len(t, et.Status.Conditions, 5)
assert.Equal(t, false, et.Status.IsReady())

transformationCondition := et.Status.GetCondition(TransformationConditionReady)
assert.Equal(t, corev1.ConditionUnknown, transformationCondition.Status, et)
assert.Len(t, et.Status.Conditions, 4)
assert.Len(t, et.Status.Conditions, 5)
assert.Equal(t, false, et.Status.IsReady())

et.Status.PropagateJsonataSinkBindingUnset()

transformationCondition = et.Status.GetCondition(TransformationConditionReady)
assert.Equal(t, corev1.ConditionTrue, transformationCondition.Status, et)
assert.Len(t, et.Status.Conditions, 5)
assert.Len(t, et.Status.Conditions, 6)
assert.Equal(t, false, et.Status.IsReady())

et.Status.SetAddresses(duckv1.Addressable{URL: apis.HTTPS("example.com")})
addrCondition := et.Status.GetCondition(TransformConditionAddressable)
assert.Equal(t, corev1.ConditionTrue, addrCondition.Status, et)
assert.Len(t, et.Status.Conditions, 5)
assert.Len(t, et.Status.Conditions, 6)
assert.Equal(t, false, et.Status.IsReady())

et.Status.MarkEventPoliciesTrue()
assert.Len(t, et.Status.Conditions, 6)
assert.Equal(t, true, et.Status.IsReady())

// All conditions are ready
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/eventing/v1alpha1/eventtransform_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"
Expand Down Expand Up @@ -130,6 +131,10 @@ type EventTransformStatus struct {
// +optional
duckv1.AddressStatus `json:",inline"`

// AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this EventTransform.
// +optional
eventingduckv1.AppliedEventPoliciesStatus `json:",inline"`

// JsonataTransformationStatus is the status associated with JsonataEventTransformationSpec.
// +optional
JsonataTransformationStatus *JsonataEventTransformationStatus `json:"jsonata,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions pkg/reconciler/eventtransform/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,30 @@ import (
"context"

cmclient "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/certificates"
"knative.dev/eventing/pkg/eventingtls"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/filtered"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered"
rolebindinginformer "knative.dev/pkg/client/injection/kube/informers/rbac/v1/rolebinding"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"
eventtransforminformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtransform"
sinkbindinginformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1/sinkbinding/filtered"
"knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1alpha1/eventtransform"
Expand All @@ -49,6 +54,10 @@ const (
NameLabelKey = "eventing.knative.dev/event-transform-name"
)

type envConfig struct {
AuthProxyImage string `envconfig:"AUTH_PROXY_IMAGE" required:"true"`
}

func NewController(
ctx context.Context,
cmw configmap.Watcher,
Expand All @@ -62,6 +71,8 @@ func NewController(
certificatesSecretInformer := secretinformer.Get(ctx, certificates.SecretLabelSelectorPair)
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector)
dynamicCertificatesInformer := certificates.NewDynamicCertificatesInformer()
eventPolicyInformer := eventpolicyinformer.Get(ctx)
rolebindingInformer := rolebindinginformer.Get(ctx)

// Create a custom informer as one in knative/pkg doesn't exist for endpoints.
jsonataEndpointFactory := informers.NewSharedInformerFactoryWithOptions(
Expand All @@ -73,6 +84,11 @@ func NewController(
)
jsonataEndpointInformer := jsonataEndpointFactory.Core().V1().Endpoints()

env := &envConfig{}
if err := envconfig.Process("", env); err != nil {
logging.FromContext(ctx).Panicf("unable to process EventTransform's required environment variables: %v", err)
}

var globalResync func()
var enqueueControllerOf func(interface{})

Expand Down Expand Up @@ -104,6 +120,10 @@ func NewController(
cmCertificateLister: dynamicCertificatesInformer.Lister(),
certificatesSecretLister: certificatesSecretInformer.Lister(),
trustBundleConfigMapLister: trustBundleConfigMapInformer.Lister(),
eventPolicyLister: eventPolicyInformer.Lister(),
rolebindingLister: rolebindingInformer.Lister(),
eventTransformLister: eventTransformInformer.Lister(),
authProxyImage: env.AuthProxyImage,
configWatcher: configWatcher,
}

Expand All @@ -126,6 +146,13 @@ func NewController(
jsonataConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
jsonataSinkBindingInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))

eventTransformGK := v1alpha1.SchemeGroupVersion.WithKind("EventTransform").GroupKind()
eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(
eventTransformInformer.Informer().GetIndexer(),
eventTransformGK,
impl.EnqueueKey,
))

// Start the factory after creating all necessary informers.
jsonataEndpointFactory.Start(ctx.Done())
jsonataEndpointFactory.WaitForCacheSync(ctx.Done())
Expand Down
Loading
Loading