diff --git a/package.json b/package.json index 7f539b3..1e725ac 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ngx-mqtt", - "version": "6.3.0", + "version": "6.3.1", "description": "ngx mqtt client library", "main": "bundles/ngx-mqtt.min.js", "module": "./src/index.js", diff --git a/src/mqtt.service.ts b/src/mqtt.service.ts index d886a81..729ccb7 100644 --- a/src/mqtt.service.ts +++ b/src/mqtt.service.ts @@ -143,7 +143,7 @@ export class MqttService { * Every new subscriber gets the latest message. */ public observeRetained(filterString: string): Observable { - return this._generalObserve(filterString, () => publishReplay()); + return this._generalObserve(filterString, () => publishReplay(1)); } /** diff --git a/src/rxjs/CoditionalReplaySubject.ts b/src/rxjs/CoditionalReplaySubject.ts deleted file mode 100644 index afee111..0000000 --- a/src/rxjs/CoditionalReplaySubject.ts +++ /dev/null @@ -1,141 +0,0 @@ -import { - ObjectUnsubscribedError, - SchedulerLike, - Subject, - Subscriber, - Subscription -} from 'rxjs'; -import { queue } from 'rxjs/internal/scheduler/queue'; -import { SubjectSubscription } from 'rxjs/internal/SubjectSubscription'; -import { ObserveOnSubscriber } from 'rxjs/internal/operators/observeOn'; -/** - * @class ConditionalReplaySubject - */ -export class ConditionalReplaySubject extends Subject { - private _events: (ReplayEvent | T)[] = []; - private _bufferSize: number; - private _windowTime: number; - private _infiniteTimeWindow: boolean = false; - private _expression: (value: T) => boolean; - - constructor(bufferSize: number = Number.POSITIVE_INFINITY, - windowTime: number = Number.POSITIVE_INFINITY, - private scheduler?: SchedulerLike, - expression?: (value: T) => boolean) { - super(); - this._bufferSize = bufferSize < 1 ? 1 : bufferSize; - this._windowTime = windowTime < 1 ? 1 : windowTime; - this._expression = expression ? expression : () => true; - - if (windowTime === Number.POSITIVE_INFINITY) { - this._infiniteTimeWindow = true; - this.next = this.nextInfiniteTimeWindow; - } else { - this.next = this.nextTimeWindow; - } - } - - private nextInfiniteTimeWindow(value: T): void { - const _events = this._events; - if (this._expression(value) === true) { - _events.push(value); - } - // Since this method is invoked in every next() call than the buffer - // can overgrow the max size only by one item - if (_events.length > this._bufferSize) { - _events.shift(); - } - - super.next(value); - } - - private nextTimeWindow(value: T): void { - if (this._expression(value) === true) { - this._events.push(new ReplayEvent(this._getNow(), value)); - this._trimBufferThenGetEvents(); - } - - super.next(value); - } - - /** @deprecated This is an internal implementation detail, do not use. */ - _subscribe(subscriber: Subscriber): Subscription { - // When `_infiniteTimeWindow === true` then the buffer is already trimmed - const _infiniteTimeWindow = this._infiniteTimeWindow; - const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents(); - const scheduler = this.scheduler; - const len = _events.length; - let subscription: Subscription; - - if (this.closed) { - throw new ObjectUnsubscribedError(); - } else if (this.isStopped || this.hasError) { - subscription = Subscription.EMPTY; - } else { - this.observers.push(subscriber); - subscription = new SubjectSubscription(this, subscriber); - } - - if (scheduler) { - subscriber.add(subscriber = new ObserveOnSubscriber(subscriber, scheduler)); - } - - if (_infiniteTimeWindow) { - for (let i = 0; i < len && !subscriber.closed; i++) { - subscriber.next(_events[i]); - } - } else { - for (let i = 0; i < len && !subscriber.closed; i++) { - subscriber.next((>_events[i]).value); - } - } - - if (this.hasError) { - subscriber.error(this.thrownError); - } else if (this.isStopped) { - subscriber.complete(); - } - - return subscription; - } - - _getNow(): number { - return (this.scheduler || queue).now(); - } - - private _trimBufferThenGetEvents(): ReplayEvent[] { - const now = this._getNow(); - const _bufferSize = this._bufferSize; - const _windowTime = this._windowTime; - const _events = []>this._events; - - const eventsCount = _events.length; - let spliceCount = 0; - - // Trim events that fall out of the time window. - // Start at the front of the list. Break early once - // we encounter an event that falls within the window. - while (spliceCount < eventsCount) { - if ((now - _events[spliceCount].time) < _windowTime) { - break; - } - spliceCount++; - } - - if (eventsCount > _bufferSize) { - spliceCount = Math.max(spliceCount, eventsCount - _bufferSize); - } - - if (spliceCount > 0) { - _events.splice(0, spliceCount); - } - - return _events; - } - -} - -class ReplayEvent { - constructor(public time: number, public value: T) { - } -} \ No newline at end of file diff --git a/src/rxjs/operators/publishReplayConditionally.ts b/src/rxjs/operators/publishReplayConditionally.ts deleted file mode 100644 index 7de6772..0000000 --- a/src/rxjs/operators/publishReplayConditionally.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { - ConnectableObservable, - MonoTypeOperatorFunction, - Observable, - OperatorFunction, - SchedulerLike, - UnaryFunction -} from 'rxjs'; - -import { ConditionalReplaySubject } from '../CoditionalReplaySubject'; -import { multicast } from 'rxjs/internal/operators/multicast'; - -/* tslint:disable:max-line-length */ -export function publishReplayConditionally(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; -export function publishReplayConditionally(bufferSize?: number, windowTime?: number, selector?: OperatorFunction, scheduler?: SchedulerLike): OperatorFunction; -export function publishReplayConditionally(bufferSize?: number, windowTime?: number, selector?: MonoTypeOperatorFunction, scheduler?: SchedulerLike): MonoTypeOperatorFunction; -export function publishReplayConditionally(bufferSize?: number, windowTime?: number, selector?: MonoTypeOperatorFunction, scheduler?: SchedulerLike, expression?: (value: T) => boolean): MonoTypeOperatorFunction; -/* tslint:enable:max-line-length */ - -export function publishReplayConditionally(bufferSize?: number, - windowTime?: number, - selectorOrScheduler?: SchedulerLike | OperatorFunction, - scheduler?: SchedulerLike, - expression?: (value: T) => boolean): UnaryFunction, ConnectableObservable> { - - if (selectorOrScheduler && typeof selectorOrScheduler !== 'function') { - scheduler = selectorOrScheduler; - } - - const selector = typeof selectorOrScheduler === 'function' ? selectorOrScheduler : undefined; - const subject = new ConditionalReplaySubject(bufferSize, windowTime, scheduler, expression); - - return (source: Observable) => multicast(() => subject, selector)(source) as ConnectableObservable; -} \ No newline at end of file