Skip to content

Commit a9cdfea

Browse files
authored
Merge pull request #1690 from AvaloniaUI/custom-rx
Use custom Rx classes
2 parents 6720bb9 + 9366434 commit a9cdfea

26 files changed

+1004
-550
lines changed

src/Avalonia.Base/AvaloniaObject.cs

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Avalonia.Data;
1111
using Avalonia.Diagnostics;
1212
using Avalonia.Logging;
13+
using Avalonia.Reactive;
1314
using Avalonia.Threading;
1415
using Avalonia.Utilities;
1516

@@ -38,7 +39,7 @@ public class AvaloniaObject : IAvaloniaObject, IAvaloniaObjectDebug, INotifyProp
3839
/// Maintains a list of direct property binding subscriptions so that the binding source
3940
/// doesn't get collected.
4041
/// </summary>
41-
private List<IDisposable> _directBindings;
42+
private List<DirectBindingSubscription> _directBindings;
4243

4344
/// <summary>
4445
/// Event handler for <see cref="INotifyPropertyChanged"/> implementation.
@@ -359,25 +360,12 @@ public IDisposable Bind(
359360
property,
360361
description);
361362

362-
IDisposable subscription = null;
363-
364363
if (_directBindings == null)
365364
{
366-
_directBindings = new List<IDisposable>();
365+
_directBindings = new List<DirectBindingSubscription>();
367366
}
368367

369-
subscription = source
370-
.Select(x => CastOrDefault(x, property.PropertyType))
371-
.Do(_ => { }, () => _directBindings.Remove(subscription))
372-
.Subscribe(x => SetDirectValue(property, x));
373-
374-
_directBindings.Add(subscription);
375-
376-
return Disposable.Create(() =>
377-
{
378-
subscription.Dispose();
379-
_directBindings.Remove(subscription);
380-
});
368+
return new DirectBindingSubscription(this, property, source);
381369
}
382370
else
383371
{
@@ -908,5 +896,38 @@ private void LogPropertySet(AvaloniaProperty property, object value, BindingPrio
908896
value,
909897
priority);
910898
}
899+
900+
private class DirectBindingSubscription : IObserver<object>, IDisposable
901+
{
902+
readonly AvaloniaObject _owner;
903+
readonly AvaloniaProperty _property;
904+
IDisposable _subscription;
905+
906+
public DirectBindingSubscription(
907+
AvaloniaObject owner,
908+
AvaloniaProperty property,
909+
IObservable<object> source)
910+
{
911+
_owner = owner;
912+
_property = property;
913+
_owner._directBindings.Add(this);
914+
_subscription = source.Subscribe(this);
915+
}
916+
917+
public void Dispose()
918+
{
919+
_subscription.Dispose();
920+
_owner._directBindings.Remove(this);
921+
}
922+
923+
public void OnCompleted() => Dispose();
924+
public void OnError(Exception error) => Dispose();
925+
926+
public void OnNext(object value)
927+
{
928+
var castValue = CastOrDefault(value, _property.PropertyType);
929+
_owner.SetDirectValue(_property, castValue);
930+
}
931+
}
911932
}
912933
}

src/Avalonia.Base/AvaloniaObjectExtensions.cs

Lines changed: 16 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -36,32 +36,15 @@ public static IBinding ToBinding<T>(this IObservable<T> source)
3636
/// An observable which fires immediately with the current value of the property on the
3737
/// object and subsequently each time the property value changes.
3838
/// </returns>
39+
/// <remarks>
40+
/// The subscription to <paramref name="o"/> is created using a weak reference.
41+
/// </remarks>
3942
public static IObservable<object> GetObservable(this IAvaloniaObject o, AvaloniaProperty property)
4043
{
4144
Contract.Requires<ArgumentNullException>(o != null);
4245
Contract.Requires<ArgumentNullException>(property != null);
4346

44-
return new AvaloniaObservable<object>(
45-
observer =>
46-
{
47-
EventHandler<AvaloniaPropertyChangedEventArgs> handler = (s, e) =>
48-
{
49-
if (e.Property == property)
50-
{
51-
observer.OnNext(e.NewValue);
52-
}
53-
};
54-
55-
observer.OnNext(o.GetValue(property));
56-
57-
o.PropertyChanged += handler;
58-
59-
return Disposable.Create(() =>
60-
{
61-
o.PropertyChanged -= handler;
62-
});
63-
},
64-
GetDescription(o, property));
47+
return new AvaloniaPropertyObservable<object>(o, property);
6548
}
6649

6750
/// <summary>
@@ -74,51 +57,36 @@ public static IObservable<object> GetObservable(this IAvaloniaObject o, Avalonia
7457
/// An observable which fires immediately with the current value of the property on the
7558
/// object and subsequently each time the property value changes.
7659
/// </returns>
60+
/// <remarks>
61+
/// The subscription to <paramref name="o"/> is created using a weak reference.
62+
/// </remarks>
7763
public static IObservable<T> GetObservable<T>(this IAvaloniaObject o, AvaloniaProperty<T> property)
7864
{
7965
Contract.Requires<ArgumentNullException>(o != null);
8066
Contract.Requires<ArgumentNullException>(property != null);
8167

82-
return o.GetObservable((AvaloniaProperty)property).Cast<T>();
68+
return new AvaloniaPropertyObservable<T>(o, property);
8369
}
8470

8571
/// <summary>
86-
/// Gets an observable for a <see cref="AvaloniaProperty"/>.
72+
/// Gets an observable that listens for property changed events for an
73+
/// <see cref="AvaloniaProperty"/>.
8774
/// </summary>
8875
/// <param name="o">The object.</param>
89-
/// <typeparam name="T">The type of the property.</typeparam>
9076
/// <param name="property">The property.</param>
9177
/// <returns>
92-
/// An observable which when subscribed pushes the old and new values of the property each
93-
/// time it is changed. Note that the observable returned from this method does not fire
94-
/// with the current value of the property immediately.
78+
/// An observable which when subscribed pushes the property changed event args
79+
/// each time a <see cref="IAvaloniaObject.PropertyChanged"/> event is raised
80+
/// for the specified property.
9581
/// </returns>
96-
public static IObservable<Tuple<T, T>> GetObservableWithHistory<T>(
82+
public static IObservable<AvaloniaPropertyChangedEventArgs> GetPropertyChangedObservable(
9783
this IAvaloniaObject o,
98-
AvaloniaProperty<T> property)
84+
AvaloniaProperty property)
9985
{
10086
Contract.Requires<ArgumentNullException>(o != null);
10187
Contract.Requires<ArgumentNullException>(property != null);
10288

103-
return new AvaloniaObservable<Tuple<T, T>>(
104-
observer =>
105-
{
106-
EventHandler<AvaloniaPropertyChangedEventArgs> handler = (s, e) =>
107-
{
108-
if (e.Property == property)
109-
{
110-
observer.OnNext(Tuple.Create((T)e.OldValue, (T)e.NewValue));
111-
}
112-
};
113-
114-
o.PropertyChanged += handler;
115-
116-
return Disposable.Create(() =>
117-
{
118-
o.PropertyChanged -= handler;
119-
});
120-
},
121-
GetDescription(o, property));
89+
return new AvaloniaPropertyChangedObservable(o, property);
12290
}
12391

12492
/// <summary>
@@ -166,23 +134,6 @@ public static ISubject<T> GetSubject<T>(
166134
o.GetObservable(property));
167135
}
168136

169-
/// <summary>
170-
/// Gets a weak observable for a <see cref="AvaloniaProperty"/>.
171-
/// </summary>
172-
/// <param name="o">The object.</param>
173-
/// <param name="property">The property.</param>
174-
/// <returns>An observable.</returns>
175-
public static IObservable<object> GetWeakObservable(this IAvaloniaObject o, AvaloniaProperty property)
176-
{
177-
Contract.Requires<ArgumentNullException>(o != null);
178-
Contract.Requires<ArgumentNullException>(property != null);
179-
180-
return new WeakPropertyChangedObservable(
181-
new WeakReference<IAvaloniaObject>(o),
182-
property,
183-
GetDescription(o, property));
184-
}
185-
186137
/// <summary>
187138
/// Binds a property on an <see cref="IAvaloniaObject"/> to an <see cref="IBinding"/>.
188139
/// </summary>

src/Avalonia.Base/Collections/NotifyCollectionChangedExtensions.cs

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@
22
// Licensed under the MIT license. See licence.md file in the project root for full license information.
33

44
using System;
5-
using System.Collections;
6-
using System.Collections.Generic;
75
using System.Collections.Specialized;
8-
using System.Reactive;
9-
using System.Reactive.Disposables;
106
using System.Reactive.Linq;
11-
using System.Reactive.Subjects;
7+
using Avalonia.Reactive;
128
using Avalonia.Utilities;
139

1410
namespace Avalonia.Collections
@@ -43,9 +39,8 @@ public static IDisposable WeakSubscribe(
4339
Contract.Requires<ArgumentNullException>(collection != null);
4440
Contract.Requires<ArgumentNullException>(handler != null);
4541

46-
return
47-
collection.GetWeakCollectionChangedObservable()
48-
.Subscribe(e => handler.Invoke(collection, e));
42+
return collection.GetWeakCollectionChangedObservable()
43+
.Subscribe(e => handler(collection, e));
4944
}
5045

5146
/// <summary>
@@ -63,18 +58,13 @@ public static IDisposable WeakSubscribe(
6358
Contract.Requires<ArgumentNullException>(collection != null);
6459
Contract.Requires<ArgumentNullException>(handler != null);
6560

66-
return
67-
collection.GetWeakCollectionChangedObservable()
68-
.Subscribe(handler);
61+
return collection.GetWeakCollectionChangedObservable().Subscribe(handler);
6962
}
7063

71-
private class WeakCollectionChangedObservable : ObservableBase<NotifyCollectionChangedEventArgs>,
64+
private class WeakCollectionChangedObservable : LightweightObservableBase<NotifyCollectionChangedEventArgs>,
7265
IWeakSubscriber<NotifyCollectionChangedEventArgs>
7366
{
7467
private WeakReference<INotifyCollectionChanged> _sourceReference;
75-
private readonly Subject<NotifyCollectionChangedEventArgs> _changed = new Subject<NotifyCollectionChangedEventArgs>();
76-
77-
private int _count;
7868

7969
public WeakCollectionChangedObservable(WeakReference<INotifyCollectionChanged> source)
8070
{
@@ -83,43 +73,28 @@ public WeakCollectionChangedObservable(WeakReference<INotifyCollectionChanged> s
8373

8474
public void OnEvent(object sender, NotifyCollectionChangedEventArgs e)
8575
{
86-
_changed.OnNext(e);
76+
PublishNext(e);
8777
}
8878

89-
protected override IDisposable SubscribeCore(IObserver<NotifyCollectionChangedEventArgs> observer)
79+
protected override void Initialize()
9080
{
9181
if (_sourceReference.TryGetTarget(out INotifyCollectionChanged instance))
9282
{
93-
if (_count++ == 0)
94-
{
95-
WeakSubscriptionManager.Subscribe(
96-
instance,
97-
nameof(instance.CollectionChanged),
98-
this);
99-
}
100-
101-
return Observable.Using(() => Disposable.Create(DecrementCount), _ => _changed)
102-
.Subscribe(observer);
103-
}
104-
else
105-
{
106-
_changed.OnCompleted();
107-
observer.OnCompleted();
108-
return Disposable.Empty;
83+
WeakSubscriptionManager.Subscribe(
84+
instance,
85+
nameof(instance.CollectionChanged),
86+
this);
10987
}
11088
}
11189

112-
private void DecrementCount()
90+
protected override void Deinitialize()
11391
{
114-
if (--_count == 0)
92+
if (_sourceReference.TryGetTarget(out INotifyCollectionChanged instance))
11593
{
116-
if (_sourceReference.TryGetTarget(out INotifyCollectionChanged instance))
117-
{
118-
WeakSubscriptionManager.Unsubscribe(
119-
instance,
120-
nameof(instance.CollectionChanged),
121-
this);
122-
}
94+
WeakSubscriptionManager.Unsubscribe(
95+
instance,
96+
nameof(instance.CollectionChanged),
97+
this);
12398
}
12499
}
125100
}

src/Avalonia.Base/Data/Core/BindingExpression.cs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,23 @@
77
using System.Reactive.Subjects;
88
using Avalonia.Data.Converters;
99
using Avalonia.Logging;
10+
using Avalonia.Reactive;
1011
using Avalonia.Utilities;
1112

1213
namespace Avalonia.Data.Core
1314
{
1415
/// <summary>
1516
/// Binds to an expression on an object using a type value converter to convert the values
16-
/// that are send and received.
17+
/// that are sent and received.
1718
/// </summary>
18-
public class BindingExpression : ISubject<object>, IDescription
19+
public class BindingExpression : LightweightObservableBase<object>, ISubject<object>, IDescription
1920
{
2021
private readonly ExpressionObserver _inner;
2122
private readonly Type _targetType;
2223
private readonly object _fallbackValue;
2324
private readonly BindingPriority _priority;
24-
private readonly Subject<object> _errors = new Subject<object>();
25+
InnerListener _innerListener;
26+
WeakReference<object> _value;
2527

2628
/// <summary>
2729
/// Initializes a new instance of the <see cref="ExpressionObserver"/> class.
@@ -139,7 +141,7 @@ public void OnNext(object value)
139141
"IValueConverter should not return non-errored BindingNotification.");
140142
}
141143

142-
_errors.OnNext(notification);
144+
PublishNext(notification);
143145

144146
if (_fallbackValue != AvaloniaProperty.UnsetValue)
145147
{
@@ -170,12 +172,18 @@ public void OnNext(object value)
170172
}
171173
}
172174

173-
/// <inheritdoc/>
174-
public IDisposable Subscribe(IObserver<object> observer)
175+
protected override void Initialize() => _innerListener = new InnerListener(this);
176+
protected override void Deinitialize() => _innerListener.Dispose();
177+
178+
protected override void Subscribed(IObserver<object> observer, bool first)
175179
{
176-
return _inner.Select(ConvertValue).Merge(_errors).Subscribe(observer);
180+
if (!first && _value != null && _value.TryGetTarget(out var val) == true)
181+
{
182+
observer.OnNext(val);
183+
}
177184
}
178185

186+
/// <inheritdoc/>
179187
private object ConvertValue(object value)
180188
{
181189
var notification = value as BindingNotification;
@@ -301,5 +309,28 @@ private static BindingNotification Merge(BindingNotification a, BindingNotificat
301309

302310
return a;
303311
}
312+
313+
public class InnerListener : IObserver<object>, IDisposable
314+
{
315+
private readonly BindingExpression _owner;
316+
private readonly IDisposable _dispose;
317+
318+
public InnerListener(BindingExpression owner)
319+
{
320+
_owner = owner;
321+
_dispose = owner._inner.Subscribe(this);
322+
}
323+
324+
public void Dispose() => _dispose.Dispose();
325+
public void OnCompleted() => _owner.PublishCompleted();
326+
public void OnError(Exception error) => _owner.PublishError(error);
327+
328+
public void OnNext(object value)
329+
{
330+
var converted = _owner.ConvertValue(value);
331+
_owner._value = new WeakReference<object>(converted);
332+
_owner.PublishNext(converted);
333+
}
334+
}
304335
}
305336
}

0 commit comments

Comments
 (0)