package smartkit.internal.clientconn;

import com.google.common.base.Optional;
import com.google.gson.Gson;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import smartkit.RetrofitError;
import smartkit.internal.clientconn.SseEventFilter;
import smartkit.internal.clientconn.SseSubscriptionFilter;
import smartkit.internal.clientconn.sse.protocol.EventHandler;
import smartkit.internal.clientconn.sse.protocol.EventSource;
import smartkit.internal.clientconn.sse.protocol.MessageEvent;
import smartkit.models.event.DeviceEvent;
import smartkit.models.event.DeviceHealthEvent;
import smartkit.models.event.DeviceJoinEvent;
import smartkit.models.event.DeviceLifecycleEvent;
import smartkit.models.event.EventType;
import smartkit.models.event.HubHealthEvent;
import smartkit.models.event.InstalledAppLifecycleEvent;
import smartkit.models.event.LocationLifecycleEvent;
import smartkit.models.event.ModeEvent;
import smartkit.models.event.SecurityArmFailureEvent;
import smartkit.models.event.SecurityArmStateEvent;
import smartkit.models.event.SmartAppEvent;
import smartkit.rx.OnNextObserver;
import smartkit.rx.RetrofitErrorObserver;
import smartkit.rx.RetryWithExponentialBackoffDelay;

/* loaded from: classes.dex */
public class SseConnect {
    private static final SseSubscriptionFilter ALL_FILTERS = new SseSubscriptionFilter.Builder().setType(SseSubscriptionFilter.SseFilterType.LOCATION_IDS).addValue(SseSubscriptionFilter.ALL_EVENTS).build();
    private static final int CONNECTION_TIMEOUT_MS = 300;
    private static final int MAX_RETRY_DELAY_MS = 15000;
    private final OkHttpClient client;
    volatile EventSource eventSource;
    private final Gson gson;
    volatile boolean isUpdating;
    private final SseConnectService sseConnectService;
    volatile SseSubscription sseSubscription;
    private final Logger logger = LoggerFactory.a((Class<?>) SseConnect.class);
    final BehaviorSubject<SseConnectState> stateSubject = BehaviorSubject.create();
    final BehaviorSubject<Optional<SseSubscription>> subscriptionSubject = BehaviorSubject.create();
    final Map<SseSubscriptionFilter, Integer> filterMap = new ConcurrentHashMap();
    final PublishSubject<DeviceEvent> deviceEventsSubject = PublishSubject.create();
    final PublishSubject<DeviceHealthEvent> deviceHealthEventsSubject = PublishSubject.create();
    final PublishSubject<DeviceJoinEvent> deviceJoinEventsSubject = PublishSubject.create();
    final PublishSubject<DeviceLifecycleEvent> deviceLifecycleEventsSubject = PublishSubject.create();
    final PublishSubject<HubHealthEvent> hubHealthEventsSubject = PublishSubject.create();
    final PublishSubject<InstalledAppLifecycleEvent> installedAppLifecycleEventsSubject = PublishSubject.create();
    final PublishSubject<LocationLifecycleEvent> locationLifecycleEventsSubject = PublishSubject.create();
    final PublishSubject<ModeEvent> modeEventsSubject = PublishSubject.create();
    final PublishSubject<SecurityArmFailureEvent> securityArmFailureEventsSubject = PublishSubject.create();
    final PublishSubject<SecurityArmStateEvent> securityArmStateEventsSubject = PublishSubject.create();
    final PublishSubject<SmartAppEvent> smartAppEventsSubject = PublishSubject.create();
    volatile SseConnectState sseConnectState = SseConnectState.STOPPED;
    volatile Subscription updateSubscription = Subscriptions.empty();
    private volatile CompositeSubscription rxSubscriptions = new CompositeSubscription();

    /* loaded from: classes3.dex */
    public enum SseConnectState {
        INITIALIZING,
        CONNECTING,
        CONNECTED,
        STOPPING,
        STOPPED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes3.dex */
    public class SseHandler implements EventHandler {
        SseHandler() {
        }

        @Override // smartkit.internal.clientconn.sse.protocol.EventHandler
        public void onClosed() {
            SseConnect.this.logger.info("EventSource has closed internally");
            synchronized (SseConnect.this) {
                if (SseConnect.this.sseConnectState == SseConnectState.CONNECTED) {
                    SseConnect.this.setState(SseConnectState.CONNECTING);
                }
            }
        }

        @Override // smartkit.internal.clientconn.sse.protocol.EventHandler
        public void onComment(String str) {
            SseConnect.this.logger.info(String.format("EventSource comment: %s", str));
        }

        @Override // smartkit.internal.clientconn.sse.protocol.EventHandler
        public void onError(Throwable th) {
            SseConnect.this.logger.warn(String.format("SSE Connection Error: %s", th.getMessage()));
        }

        @Override // smartkit.internal.clientconn.sse.protocol.EventHandler
        public void onMessage(String str, MessageEvent messageEvent) {
            SseConnect.this.processEvent(EventType.from(str), messageEvent.getData());
        }

        @Override // smartkit.internal.clientconn.sse.protocol.EventHandler
        public void onOpen() {
            SseConnect.this.setState(SseConnectState.CONNECTED);
        }
    }

    public SseConnect(@Nonnull OkHttpClient okHttpClient, @Nonnull SseConnectService sseConnectService, @Nonnull Gson gson) {
        this.client = okHttpClient;
        this.sseConnectService = sseConnectService;
        this.gson = gson;
    }

    private static Integer getOrDefault(@Nonnull Map<SseSubscriptionFilter, Integer> map, @Nonnull SseSubscriptionFilter sseSubscriptionFilter, @Nonnull Integer num) {
        Integer num2 = map.get(sseSubscriptionFilter);
        return num2 == null ? num : num2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addFilters(@Nonnull List<SseSubscriptionFilter> list) {
        int size = this.filterMap.size();
        for (SseSubscriptionFilter sseSubscriptionFilter : list) {
            this.filterMap.put(sseSubscriptionFilter, Integer.valueOf(getOrDefault(this.filterMap, sseSubscriptionFilter, 0).intValue() + 1));
        }
        if (this.filterMap.size() > size) {
            update(new ArrayList(this.filterMap.keySet()));
        }
    }

    void checkFilterCount(@Nonnull SseSubscriptionFilter... sseSubscriptionFilterArr) {
        if (sseSubscriptionFilterArr.length <= 0) {
            throw new IllegalArgumentException("You must provide at least one filter.");
        }
    }

    EventSource createEventSource(@Nonnull SseSubscription sseSubscription) {
        return new EventSource.Builder(new SseHandler(), URI.create(sseSubscription.getRegistrationUrl())).name("SseConnect").connectTimeoutMs(CONNECTION_TIMEOUT_MS).client(this.client).build();
    }

    public Observable<SseConnectionData> getConnectionData() {
        return Observable.combineLatest(this.stateSubject, this.subscriptionSubject, new Func2<SseConnectState, Optional<SseSubscription>, SseConnectionData>() { // from class: smartkit.internal.clientconn.SseConnect.1
            @Override // rx.functions.Func2
            public SseConnectionData call(SseConnectState sseConnectState, Optional<SseSubscription> optional) {
                return new SseConnectionData(sseConnectState, optional.orNull());
            }
        });
    }

    @Deprecated
    public Observable<SseConnectState> getConnectionState() {
        return this.stateSubject;
    }

    public Observable<DeviceEvent> getDeviceEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.deviceEventsSubject.compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<DeviceEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.2
            @Override // rx.functions.Func1
            public Boolean call(DeviceEvent deviceEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(deviceEvent.getLocationId()).setDeviceId(deviceEvent.getDeviceId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    public Observable<DeviceHealthEvent> getDeviceHealthEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.deviceHealthEventsSubject.compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<DeviceHealthEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.3
            @Override // rx.functions.Func1
            public Boolean call(DeviceHealthEvent deviceHealthEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(deviceHealthEvent.getLocationId()).setDeviceId(deviceHealthEvent.getDeviceId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    public Observable<DeviceJoinEvent> getDeviceJoinEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.deviceJoinEventsSubject.compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<DeviceJoinEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.4
            @Override // rx.functions.Func1
            public Boolean call(DeviceJoinEvent deviceJoinEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(deviceJoinEvent.getLocationId()).setDeviceId(deviceJoinEvent.getDeviceId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    public Observable<DeviceLifecycleEvent> getDeviceLifecycleEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.deviceLifecycleEventsSubject.compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<DeviceLifecycleEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.5
            @Override // rx.functions.Func1
            public Boolean call(DeviceLifecycleEvent deviceLifecycleEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(deviceLifecycleEvent.getLocationId()).setDeviceId(deviceLifecycleEvent.getDeviceId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    public Observable<HubHealthEvent> getHubHealthEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.hubHealthEventsSubject.compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<HubHealthEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.6
            @Override // rx.functions.Func1
            public Boolean call(HubHealthEvent hubHealthEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(hubHealthEvent.getLocationId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    public Observable<InstalledAppLifecycleEvent> getInstalledAppLifecycleEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.installedAppLifecycleEventsSubject.asObservable().compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<InstalledAppLifecycleEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.7
            @Override // rx.functions.Func1
            public Boolean call(InstalledAppLifecycleEvent installedAppLifecycleEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(installedAppLifecycleEvent.getLocationId()).setInstalledSmartAppId(installedAppLifecycleEvent.getInstalledAppId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    public Observable<LocationLifecycleEvent> getLocationLifecycleEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.locationLifecycleEventsSubject.compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<LocationLifecycleEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.8
            @Override // rx.functions.Func1
            public Boolean call(LocationLifecycleEvent locationLifecycleEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(locationLifecycleEvent.getLocationId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    public Observable<ModeEvent> getModeEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.modeEventsSubject.compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<ModeEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.9
            @Override // rx.functions.Func1
            public Boolean call(ModeEvent modeEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(modeEvent.getLocationId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    public Observable<SecurityArmFailureEvent> getSecurityArmFailureEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.securityArmFailureEventsSubject.compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<SecurityArmFailureEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.10
            @Override // rx.functions.Func1
            public Boolean call(SecurityArmFailureEvent securityArmFailureEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(securityArmFailureEvent.getLocationId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    public Observable<SecurityArmStateEvent> getSecurityArmStateEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.securityArmStateEventsSubject.compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<SecurityArmStateEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.11
            @Override // rx.functions.Func1
            public Boolean call(SecurityArmStateEvent securityArmStateEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(securityArmStateEvent.getLocationId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    public Observable<SmartAppEvent> getSmartAppEvents(@Nonnull final SseSubscriptionFilter... sseSubscriptionFilterArr) {
        checkFilterCount(sseSubscriptionFilterArr);
        return this.smartAppEventsSubject.asObservable().compose(new SubscriptionFilterTransformer(this, sseSubscriptionFilterArr)).filter(new Func1<SmartAppEvent, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.12
            @Override // rx.functions.Func1
            public Boolean call(SmartAppEvent smartAppEvent) {
                return Boolean.valueOf(SseEventFilter.filterData(new SseEventFilter.FilterableData.Builder().setLocationId(smartAppEvent.getLocationId()).setInstalledSmartAppId(smartAppEvent.getInstalledAppId()).build(), sseSubscriptionFilterArr));
            }
        });
    }

    synchronized Observable<SseSubscription> getStartingObservable() {
        return this.sseConnectService.createInactiveSubscription(new ArrayList(this.filterMap.keySet())).retryWhen(new RetryWithExponentialBackoffDelay.Builder().setMaxDelay(15000L).build()).doOnNext(new Action1<SseSubscription>() { // from class: smartkit.internal.clientconn.SseConnect.13
            @Override // rx.functions.Action1
            public void call(SseSubscription sseSubscription) {
                SseConnect.this.setSubscription(sseSubscription);
                SseConnect.this.setState(SseConnectState.CONNECTING);
                SseConnect.this.eventSource = SseConnect.this.createEventSource(sseSubscription);
                SseConnect.this.eventSource.start();
            }
        });
    }

    synchronized Observable<Void> getStoppingObservable() {
        return Observable.defer(new Func0<Observable<String>>() { // from class: smartkit.internal.clientconn.SseConnect.15
            @Override // rx.functions.Func0, java.util.concurrent.Callable
            public Observable<String> call() {
                SseConnect.this.eventSource.close();
                SseConnect.this.eventSource = null;
                String subscriptionId = SseConnect.this.sseSubscription.getSubscriptionId();
                SseConnect.this.setSubscription(null);
                SseConnect.this.setState(SseConnectState.STOPPED);
                return Observable.just(subscriptionId);
            }
        }).flatMap(new Func1<String, Observable<Void>>() { // from class: smartkit.internal.clientconn.SseConnect.14
            @Override // rx.functions.Func1
            public Observable<Void> call(String str) {
                return SseConnect.this.sseConnectService.deleteSubscription(str);
            }
        });
    }

    synchronized Observable<SseSubscription> getUpdateObservable(@Nonnull List<SseSubscriptionFilter> list) {
        return this.sseConnectService.updateSubscription(this.sseSubscription.getSubscriptionId(), list).retryWhen(new RetryWithExponentialBackoffDelay.Builder().setMaxDelay(15000L).build()).doOnNext(new Action1<SseSubscription>() { // from class: smartkit.internal.clientconn.SseConnect.16
            @Override // rx.functions.Action1
            public void call(SseSubscription sseSubscription) {
                SseConnect.this.setSubscription(sseSubscription);
                ArrayList arrayList = new ArrayList(SseConnect.this.filterMap.keySet());
                List<SseSubscriptionFilter> subscriptionFilters = sseSubscription.getSubscriptionFilters();
                SseConnect.this.isUpdating = false;
                if (SseSubscriptionFilter.filtersEqual(arrayList, subscriptionFilters)) {
                    return;
                }
                SseConnect.this.update(arrayList);
            }
        });
    }

    void processEvent(@Nonnull EventType eventType, @Nonnull String str) {
        switch (eventType) {
            case DEVICE:
                this.logger.debug(String.format("SSE Device event: %s", str));
                this.deviceEventsSubject.onNext((DeviceEvent) this.gson.fromJson(str, DeviceEvent.class));
                return;
            case DEVICE_HEALTH:
                this.logger.debug(String.format("SSE Device Health Event: %s", str));
                this.deviceHealthEventsSubject.onNext((DeviceHealthEvent) this.gson.fromJson(str, DeviceHealthEvent.class));
                return;
            case DEVICE_JOIN:
                this.logger.debug(String.format("SSE Device Join Event: %s", str));
                this.deviceJoinEventsSubject.onNext((DeviceJoinEvent) this.gson.fromJson(str, DeviceJoinEvent.class));
                return;
            case DEVICE_LIFECYCLE:
                this.logger.debug(String.format("SSE Device Lifecycle Event: %s", str));
                DeviceLifecycleEvent deviceLifecycleEvent = (DeviceLifecycleEvent) this.gson.fromJson(str, DeviceLifecycleEvent.class);
                if (deviceLifecycleEvent.getChangeType() != DeviceLifecycleEvent.ChangeType.UNKNOWN) {
                    this.deviceLifecycleEventsSubject.onNext(deviceLifecycleEvent);
                    return;
                }
                return;
            case HUB_HEALTH:
                this.logger.debug(String.format("SSE Hub Health Event: %s", str));
                this.hubHealthEventsSubject.onNext((HubHealthEvent) this.gson.fromJson(str, HubHealthEvent.class));
                return;
            case INSTALLED_APP_LIFECYCLE:
                this.logger.debug(String.format("SSE Installed App Lifecycle Event: %s", str));
                InstalledAppLifecycleEvent installedAppLifecycleEvent = (InstalledAppLifecycleEvent) this.gson.fromJson(str, InstalledAppLifecycleEvent.class);
                if (installedAppLifecycleEvent.getChangeType() != InstalledAppLifecycleEvent.ChangeType.UNKNOWN) {
                    this.installedAppLifecycleEventsSubject.onNext(installedAppLifecycleEvent);
                    return;
                }
                return;
            case LOCATION_LIFECYCLE:
                this.logger.debug(String.format("SSE Location Lifecycle Event: %s", str));
                LocationLifecycleEvent locationLifecycleEvent = (LocationLifecycleEvent) this.gson.fromJson(str, LocationLifecycleEvent.class);
                if (locationLifecycleEvent.getChangeType() != LocationLifecycleEvent.ChangeType.UNKNOWN) {
                    this.locationLifecycleEventsSubject.onNext(locationLifecycleEvent);
                    return;
                }
                return;
            case MODE:
                this.logger.debug(String.format("SSE Mode Event: %s", str));
                this.modeEventsSubject.onNext((ModeEvent) this.gson.fromJson(str, ModeEvent.class));
                return;
            case SMART_APP:
                this.logger.debug(String.format("SSE SmartApp Event: %s", str));
                this.smartAppEventsSubject.onNext((SmartAppEvent) this.gson.fromJson(str, SmartAppEvent.class));
                return;
            case SECURITY_ARM_FAILURE:
                this.logger.debug(String.format("SSE Security Arm Failure Event: %s", str));
                this.securityArmFailureEventsSubject.onNext((SecurityArmFailureEvent) this.gson.fromJson(str, SecurityArmFailureEvent.class));
                return;
            case SECURITY_ARM_STATE:
                this.logger.debug(String.format("SSE Security Arm State Event: %s", str));
                this.securityArmStateEventsSubject.onNext((SecurityArmStateEvent) this.gson.fromJson(str, SecurityArmStateEvent.class));
                return;
            case CONTROL:
                this.logger.debug(String.format("SSE Control Event: %s", str));
                return;
            default:
                this.logger.debug(String.format("SSE Unknown Event: %s", str));
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeFilters(@Nonnull List<SseSubscriptionFilter> list) {
        int size = this.filterMap.size();
        for (SseSubscriptionFilter sseSubscriptionFilter : list) {
            Integer valueOf = Integer.valueOf(getOrDefault(this.filterMap, sseSubscriptionFilter, 0).intValue() - 1);
            if (valueOf.intValue() <= 0) {
                this.filterMap.remove(sseSubscriptionFilter);
            } else if (valueOf.intValue() >= 1) {
                this.filterMap.put(sseSubscriptionFilter, valueOf);
            }
        }
        if (this.filterMap.size() < size) {
            update(new ArrayList(this.filterMap.keySet()));
        }
    }

    synchronized void setState(@Nonnull SseConnectState sseConnectState) {
        if (this.sseConnectState != sseConnectState) {
            this.logger.debug(String.format("SSE Connection State: %1$s -> %2$s", this.sseConnectState, sseConnectState));
            this.sseConnectState = sseConnectState;
            this.stateSubject.onNext(sseConnectState);
        }
    }

    synchronized void setSubscription(@Nullable SseSubscription sseSubscription) {
        this.sseSubscription = sseSubscription;
        this.subscriptionSubject.onNext(Optional.fromNullable(sseSubscription));
    }

    public synchronized void start() {
        this.updateSubscription.unsubscribe();
        if (this.sseConnectState == SseConnectState.STOPPING) {
            this.updateSubscription = getConnectionState().first(new Func1<SseConnectState, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.18
                @Override // rx.functions.Func1
                public Boolean call(SseConnectState sseConnectState) {
                    return Boolean.valueOf(sseConnectState == SseConnectState.STOPPED);
                }
            }).subscribe(new OnNextObserver<SseConnectState>() { // from class: smartkit.internal.clientconn.SseConnect.17
                @Override // rx.Observer
                public void onNext(SseConnectState sseConnectState) {
                    SseConnect.this.start();
                }
            });
        } else if (this.sseConnectState == SseConnectState.STOPPED) {
            setState(SseConnectState.INITIALIZING);
            this.rxSubscriptions.unsubscribe();
            this.rxSubscriptions = new CompositeSubscription();
            this.rxSubscriptions.add(getStartingObservable().subscribeOn(Schedulers.io()).subscribe(new RetrofitErrorObserver<SseSubscription>() { // from class: smartkit.internal.clientconn.SseConnect.19
                @Override // smartkit.rx.RetrofitObserver
                public void onError(RetrofitError retrofitError) {
                    SseConnect.this.logger.error("SSE Start Error", (Throwable) retrofitError);
                }
            }));
        }
    }

    public synchronized void stop() {
        switch (this.sseConnectState) {
            case STOPPING:
            case STOPPED:
                break;
            default:
                if (this.eventSource != null) {
                    setState(SseConnectState.STOPPING);
                    this.isUpdating = false;
                    getStoppingObservable().subscribeOn(Schedulers.io()).subscribe(new RetrofitErrorObserver<Void>() { // from class: smartkit.internal.clientconn.SseConnect.20
                        @Override // smartkit.rx.RetrofitObserver
                        public void onError(RetrofitError retrofitError) {
                            SseConnect.this.logger.warn("SSE Stop Error", (Throwable) retrofitError);
                        }
                    });
                    break;
                } else {
                    setState(SseConnectState.STOPPED);
                    break;
                }
        }
    }

    synchronized void update(@Nonnull final List<SseSubscriptionFilter> list) {
        switch (this.sseConnectState) {
            case INITIALIZING:
                this.updateSubscription.unsubscribe();
                this.updateSubscription = getConnectionState().first(new Func1<SseConnectState, Boolean>() { // from class: smartkit.internal.clientconn.SseConnect.22
                    @Override // rx.functions.Func1
                    public Boolean call(SseConnectState sseConnectState) {
                        return Boolean.valueOf(sseConnectState == SseConnectState.CONNECTED || sseConnectState == SseConnectState.CONNECTING);
                    }
                }).subscribe(new OnNextObserver<SseConnectState>() { // from class: smartkit.internal.clientconn.SseConnect.21
                    @Override // rx.Observer
                    public void onNext(SseConnectState sseConnectState) {
                        SseConnect.this.update(list);
                    }
                });
                break;
            case CONNECTING:
            case CONNECTED:
                if (!this.isUpdating) {
                    this.isUpdating = true;
                    this.rxSubscriptions.add(getUpdateObservable(list).subscribeOn(Schedulers.io()).subscribe(new RetrofitErrorObserver<SseSubscription>() { // from class: smartkit.internal.clientconn.SseConnect.23
                        @Override // smartkit.rx.RetrofitObserver
                        public void onError(RetrofitError retrofitError) {
                            SseConnect.this.logger.error("SSE Update Error", (Throwable) retrofitError);
                        }
                    }));
                    break;
                }
                break;
        }
    }
}
