diff --git a/quartz-manager-frontend/angular.json b/quartz-manager-frontend/angular.json index 307e06d..21d6d19 100644 --- a/quartz-manager-frontend/angular.json +++ b/quartz-manager-frontend/angular.json @@ -19,7 +19,7 @@ "tsConfig": "src/tsconfig.app.json", "polyfills": "src/polyfills.ts", "allowedCommonJsDependencies": [ - "stompjs", "sockjs-client", "moment" + "stompjs", "sockjs-client", "moment", "angular2-uuid" ], "assets": [ "src/assets", diff --git a/quartz-manager-frontend/src/app/app.module.ts b/quartz-manager-frontend/src/app/app.module.ts index 8dea7b1..54fbc83 100644 --- a/quartz-manager-frontend/src/app/app.module.ts +++ b/quartz-manager-frontend/src/app/app.module.ts @@ -43,7 +43,8 @@ import { SchedulerControlComponent, LogsPanelComponent, ProgressPanelComponent, - TriggerListComponent + TriggerListComponent, + SimpleTriggerConfigComponent } from './components'; import { @@ -52,17 +53,15 @@ import { UserService, SchedulerService, ConfigService, - ProgressWebsocketService, - LogsWebsocketService, getHtmlBaseUrl, + LogsRxWebsocketService, + ProgressRxWebsocketService, TriggerService } from './services'; import { ForbiddenComponent } from './views/forbidden/forbidden.component'; import { APP_BASE_HREF } from '@angular/common'; -import {SimpleTriggerConfigComponent} from './components/simple-trigger-config'; import JobService from './services/job.service'; import {GenericErrorComponent} from './views/error/genericError.component'; -import {LogsRxWebsocketService} from './services/logs.rx-websocket.service'; export function initUserFactory(userService: UserService) { return () => userService.fetchLoggedUser(); @@ -144,8 +143,7 @@ export function jwtOptionsFactory(apiService: ApiService) { SchedulerService, JobService, TriggerService, - ProgressWebsocketService, - // LogsWebsocketService, + ProgressRxWebsocketService, LogsRxWebsocketService, AuthService, ApiService, diff --git a/quartz-manager-frontend/src/app/components/logs-panel/logs-panel.component.ts b/quartz-manager-frontend/src/app/components/logs-panel/logs-panel.component.ts index 069747b..ec76d25 100644 --- a/quartz-manager-frontend/src/app/components/logs-panel/logs-panel.component.ts +++ b/quartz-manager-frontend/src/app/components/logs-panel/logs-panel.component.ts @@ -26,7 +26,6 @@ export class LogsPanelComponent implements OnInit, OnDestroy { private selectedTriggerKey: TriggerKey; constructor( - // private logsWebsocketService: LogsWebsocketService, private logsRxWebsocketService: LogsRxWebsocketService, private apiService: ApiService ) { @@ -41,21 +40,6 @@ export class LogsPanelComponent implements OnInit, OnDestroy { } ngOnInit() { - // const obs = this.logsWebsocketService.getObservable() - // obs.subscribe({ - // 'next': this.onNewLogMsg, - // 'error': (err) => { - // console.log(err) - // } - // }); - - // this.topicSubscription = this.logsRxWebsocketService.watch('/topic/logs') - // .pipe(map(msg => JSON.parse(msg.body))) - // .subscribe(this._showNewLog, (err) => { - // console.log(err); - // // TODO in case of 401 - // // this.apiService.get('/quartz-manager/session/refresh'); - // }); } private _subscribeToTheTopic = (triggerKey: TriggerKey) => { @@ -79,14 +63,6 @@ export class LogsPanelComponent implements OnInit, OnDestroy { this.topicSubscription = null; } - // onNewLogMsg = (receivedMsg) => { - // if (receivedMsg.body.type === 'SUCCESS') { - // this._showNewLog(receivedMsg.body.message); - // } else if (receivedMsg.body.type === 'ERROR') { - // this._refreshSession(); - // } // if websocket has been closed for session expiration, try to refresh it - // }; - _showNewLog = (logRecord) => { if (this.logs.length > this.MAX_LOGS) { this.logs.pop(); diff --git a/quartz-manager-frontend/src/app/components/progress-panel/progress-panel.component.ts b/quartz-manager-frontend/src/app/components/progress-panel/progress-panel.component.ts index 0d50777..8cdc5ba 100644 --- a/quartz-manager-frontend/src/app/components/progress-panel/progress-panel.component.ts +++ b/quartz-manager-frontend/src/app/components/progress-panel/progress-panel.component.ts @@ -1,84 +1,61 @@ -import { Component, OnInit, Input, Output, EventEmitter } from '@angular/core' -import {ProgressWebsocketService, QuartzManagerWebsocketMessage} from '../../services'; - -import { Observable } from 'rxjs'; +import {Component, Input, OnDestroy, OnInit} from '@angular/core' import TriggerFiredBundle from '../../model/trigger-fired-bundle.model'; -// import {Message} from '@stomp/stompjs'; - -// import { Subscription } from 'rxjs/Subscription'; -// import {StompService} from '@stomp/ng2-stompjs'; - -// import { QueueingSubject } from 'queueing-subject' -// import websocketConnect from 'rxjs-websockets' -// import 'rxjs/add/operator/share' -// import {ServerSocket} from '../../services/qz.socket.service' +import {TriggerKey} from '../../model/triggerKey.model'; +import {ProgressRxWebsocketService} from '../../services/progress.rx-websocket.service'; +import {map} from 'rxjs/operators'; @Component({ selector: 'progress-panel', templateUrl: './progress-panel.component.html', styleUrls: ['./progress-panel.component.scss'] }) -export class ProgressPanelComponent implements OnInit { +export class ProgressPanelComponent implements OnInit, OnDestroy { progress: TriggerFiredBundle = new TriggerFiredBundle(); percentageStr: string; - // // Stream of messages - // private subscription: Subscription; - // public messages: Observable; - // // Subscription status - // public subscribed: boolean; - // // Array of historic message (bodies) - // public mq: Array = []; - + topicSubscription; + private selectedTriggerKey: TriggerKey; constructor( - private progressWebsocketService: ProgressWebsocketService, - // private _stompService: StompService, - // private serverSocket : ServerSocket + private progressRxWebsocketService: ProgressRxWebsocketService ) { } - onNewProgressMsg = (receivedMsg: QuartzManagerWebsocketMessage) => { - if (receivedMsg.type === 'SUCCESS') { - const newStatus = receivedMsg.message; - this.progress = newStatus; - this.percentageStr = this.progress.percentage + '%'; + @Input() + set triggerKey(triggerKey: TriggerKey) { + this.selectedTriggerKey = {...triggerKey} as TriggerKey; + if (this.selectedTriggerKey && this.selectedTriggerKey.name) { + this._subscribeToTheTopic(this.selectedTriggerKey); } } - ngOnInit() { - const obs = this.progressWebsocketService.getObservable() - obs.subscribe({ - 'next' : this.onNewProgressMsg, - 'error' : (err) => {console.log(err)} - }); + private _subscribeToTheTopic = (triggerKey: TriggerKey) => { + if (this.topicSubscription) { + this.topicSubscription.unsubscribe(); + } + this.topicSubscription = this.progressRxWebsocketService.watch(`/topic/progress/${triggerKey.name}`) + .pipe(map(msg => JSON.parse(msg.body))) + .subscribe(this.onNewProgressMsg, (err) => { + console.log(err); + // TODO in case of 401 + // this.apiService.get('/quartz-manager/session/refresh'); + }); + }; - // this.subscribed = false; - // this.subscribe(); - - // this.serverSocket.connect() - // this.socketSubscription = this.serverSocket.messages.subscribe((message: string) => { - // console.log('received message from server: ', message) - // }) + onNewProgressMsg = (receivedMsg) => { + this.progress = receivedMsg; + this.percentageStr = this.progress.percentage + '%'; } - // public subscribe() { - // if (this.subscribed) { - // return; - // } + ngOnInit() { + } - // // Stream of messages - // this.messages = this._stompService.subscribe('/topic/progress'); - - // // Subscribe a function to be run on_next message - // this.subscription = this.messages.subscribe(this.on_next); - - // this.subscribed = true; - // } - - // public on_next = (message: Message) => { - // this.mq.push(message.body + '\n'); - // console.log(message); - // } + ngOnDestroy() { + if (this.topicSubscription) { + this.topicSubscription.unsubscribe(); + } + this.topicSubscription.unsubscribe(); + this.topicSubscription = null; + } } diff --git a/quartz-manager-frontend/src/app/services/index.ts b/quartz-manager-frontend/src/app/services/index.ts index a3e5802..81435d0 100644 --- a/quartz-manager-frontend/src/app/services/index.ts +++ b/quartz-manager-frontend/src/app/services/index.ts @@ -3,9 +3,8 @@ export * from './user.service'; export * from './config.service'; export * from './auth.service'; export * from './scheduler.service'; -export * from './websocket.service'; -export * from './progress.websocket.service'; -export * from './logs.websocket.service'; +export * from './progress.rx-websocket.service'; +export * from './logs.rx-websocket.service'; export * from './trigger.service' export * from './job.service' diff --git a/quartz-manager-frontend/src/app/services/logs.websocket.service.ts b/quartz-manager-frontend/src/app/services/logs.websocket.service.ts deleted file mode 100644 index 97e77dd..0000000 --- a/quartz-manager-frontend/src/app/services/logs.websocket.service.ts +++ /dev/null @@ -1,12 +0,0 @@ -import {Injectable} from '@angular/core'; -import {WebsocketService, ApiService, getBaseUrl, CONTEXT_PATH} from '.'; -import {SocketOption} from '../model/SocketOption.model'; - -@Injectable() -export class LogsWebsocketService extends WebsocketService { - - constructor(private apiService: ApiService) { - super(new SocketOption(getBaseUrl() + `${CONTEXT_PATH}/logs`, '/topic/logs', apiService.getToken)) - } - -} diff --git a/quartz-manager-frontend/src/app/services/progress.rx-websocket.service.ts b/quartz-manager-frontend/src/app/services/progress.rx-websocket.service.ts new file mode 100644 index 0000000..647f668 --- /dev/null +++ b/quartz-manager-frontend/src/app/services/progress.rx-websocket.service.ts @@ -0,0 +1,23 @@ +import { Injectable } from '@angular/core'; +import {RxStompService} from './rx-stomp.service'; +import {ApiService} from './api.service'; +import SockJS from 'sockjs-client'; +import {CONTEXT_PATH, getBaseUrl} from './config.service'; + +@Injectable({ + providedIn: 'root' +}) +export class ProgressRxWebsocketService extends RxStompService { + + constructor(private apiService: ApiService) { + super({ + webSocketFactory: () => new SockJS(`${getBaseUrl()}${CONTEXT_PATH}/progress?access_token=${this.apiService.getToken()}`), + heartbeatIncoming: 0, + heartbeatOutgoing: 20000, + reconnectDelay: 200, + debug: (msg: string): void => { + console.log(new Date(), msg); + } + }); + } +} diff --git a/quartz-manager-frontend/src/app/services/progress.websocket.service.ts b/quartz-manager-frontend/src/app/services/progress.websocket.service.ts deleted file mode 100644 index 7322e6f..0000000 --- a/quartz-manager-frontend/src/app/services/progress.websocket.service.ts +++ /dev/null @@ -1,12 +0,0 @@ -import {Injectable} from '@angular/core'; -import {WebsocketService, ApiService, getBaseUrl, CONTEXT_PATH} from '.'; -import {SocketOption} from '../model/SocketOption.model'; - -@Injectable() -export class ProgressWebsocketService extends WebsocketService { - - constructor(private apiService: ApiService) { - super(new SocketOption(getBaseUrl() + `${CONTEXT_PATH}/progress`, '/topic/progress', apiService.getToken)) - } - -} diff --git a/quartz-manager-frontend/src/app/services/websocket.service.ts b/quartz-manager-frontend/src/app/services/websocket.service.ts deleted file mode 100644 index 11b4342..0000000 --- a/quartz-manager-frontend/src/app/services/websocket.service.ts +++ /dev/null @@ -1,136 +0,0 @@ -import {Observable, Subscriber} from 'rxjs'; -import {SocketEndpoint} from '../model/SocketEndpoint.model' - - -import Stomp from 'stompjs'; -import SockJS from 'sockjs-client'; -import {SocketOption} from '../model/SocketOption.model'; - -interface WebsocketSubscriber { - index: number, - observer: Subscriber -} - -export interface QuartzManagerWebsocketMessage { - type: string; - message: any; - headers: any; - self: boolean; -} - -export class WebsocketService { - - _options: SocketOption; - - _socket: SocketEndpoint = new SocketEndpoint(); - - observableStompConnection: Observable; - subscribers: Array = []; - subscriberIndex = 0; - - _messageIds: Array = []; - - reconnectionPromise: any; - - constructor(options: SocketOption) { - this._options = options - this.createObservableSocket(); - this.connect(); - } - - getOptions = () => { - } - - private createObservableSocket = () => { - this.observableStompConnection = new Observable((observer) => { - const subscriberIndex = this.subscriberIndex++; - this.addToSubscribers({index: subscriberIndex, observer}); - return () => this.removeFromSubscribers(subscriberIndex); - }); - } - - private addToSubscribers = (subscriber) => { - this.subscribers.push(subscriber); - } - - private removeFromSubscribers = (index) => { - this.subscribers = this.subscribers.filter(subscriber => subscriber.index !== index); - } - - getObservable = () => { - return this.observableStompConnection; - }; - - getMessage = function (data): QuartzManagerWebsocketMessage { - const out: QuartzManagerWebsocketMessage = {}; - out.type = 'SUCCESS'; - out.message = JSON.parse(data.body); - out.headers = {}; - out.headers.messageId = data.headers['message-id']; - - const messageIdIndex = this._messageIds.indexOf(out.headers.messageId); - if (messageIdIndex > -1) { - out.self = true; - this._messageIds = this._messageIds.splice(messageIdIndex, 1); - } - return out; - }; - - _socketListener = (frame) => { - console.log(`Connected to ${this._options.socketUrl}: ${frame}`); - this._socket.stomp.subscribe( - this._options.topicName, - data => this.subscribers.forEach(subscriber => subscriber.observer.next(this.getMessage(data))) - ); - } - - _onSocketError = (errorMsg) => { - const out: any = {}; - out.type = 'ERROR'; - out.message = errorMsg; - this.subscribers.forEach(subscriber => subscriber.observer.error(out)); - this.scheduleReconnection(); - } - - scheduleReconnection = () => { - this.reconnectionPromise = setTimeout(() => { - console.log(`Socket reconnecting to ${this._options.socketUrl}... (if it fails, next attempt in ${this._options.reconnectionTimeout} msec)`); - this.connect(); - }, this._options.reconnectionTimeout); - } - - reconnectNow = function () { - this._socket.stomp.disconnect(); - if (this.reconnectionPromise && this.reconnectionPromise.cancel) { - this.reconnectionPromise.cancel(); - } - this.connect(); - }; - - send = (message) => { - const id = Math.floor(Math.random() * 1000000); - this._socket.stomp.send(this._options.brokerName, { - priority: 9 - }, JSON.stringify({ - message: message, - id: id - })); - this._messageIds.push(id); - }; - - connect = () => { - const headers = {}; - - let socketUrl = this._options.socketUrl; - if (this._options.getAccessToken()) { - socketUrl += `?access_token=${this._options.getAccessToken()}`; - } - - this._socket.client = new SockJS(socketUrl); - this._socket.stomp = Stomp.over(this._socket.client); - this._socket.stomp.connect(headers, this._socketListener, this._onSocketError); - this._socket.stomp.onclose = this.scheduleReconnection; - } - - -} diff --git a/quartz-manager-frontend/src/app/views/manager/manager.component.html b/quartz-manager-frontend/src/app/views/manager/manager.component.html index c03d20c..cc71362 100644 --- a/quartz-manager-frontend/src/app/views/manager/manager.component.html +++ b/quartz-manager-frontend/src/app/views/manager/manager.component.html @@ -29,7 +29,10 @@
- + + diff --git a/quartz-manager-parent/quartz-manager-starter-api/src/main/java/it/fabioformosa/quartzmanager/api/websockets/WebSocketProgressNotifier.java b/quartz-manager-parent/quartz-manager-starter-api/src/main/java/it/fabioformosa/quartzmanager/api/websockets/WebSocketProgressNotifier.java index 281d744..d616bf9 100644 --- a/quartz-manager-parent/quartz-manager-starter-api/src/main/java/it/fabioformosa/quartzmanager/api/websockets/WebSocketProgressNotifier.java +++ b/quartz-manager-parent/quartz-manager-starter-api/src/main/java/it/fabioformosa/quartzmanager/api/websockets/WebSocketProgressNotifier.java @@ -21,7 +21,7 @@ public class WebSocketProgressNotifier implements WebhookSender