#103 migrated the progress websocket to rx-stomp

This commit is contained in:
Fabio Formosa
2022-12-20 23:58:51 +01:00
parent 261dd8b624
commit e4c771e364
11 changed files with 73 additions and 257 deletions

View File

@@ -19,7 +19,7 @@
"tsConfig": "src/tsconfig.app.json", "tsConfig": "src/tsconfig.app.json",
"polyfills": "src/polyfills.ts", "polyfills": "src/polyfills.ts",
"allowedCommonJsDependencies": [ "allowedCommonJsDependencies": [
"stompjs", "sockjs-client", "moment" "stompjs", "sockjs-client", "moment", "angular2-uuid"
], ],
"assets": [ "assets": [
"src/assets", "src/assets",

View File

@@ -43,7 +43,8 @@ import {
SchedulerControlComponent, SchedulerControlComponent,
LogsPanelComponent, LogsPanelComponent,
ProgressPanelComponent, ProgressPanelComponent,
TriggerListComponent TriggerListComponent,
SimpleTriggerConfigComponent
} from './components'; } from './components';
import { import {
@@ -52,17 +53,15 @@ import {
UserService, UserService,
SchedulerService, SchedulerService,
ConfigService, ConfigService,
ProgressWebsocketService,
LogsWebsocketService,
getHtmlBaseUrl, getHtmlBaseUrl,
LogsRxWebsocketService,
ProgressRxWebsocketService,
TriggerService TriggerService
} from './services'; } from './services';
import { ForbiddenComponent } from './views/forbidden/forbidden.component'; import { ForbiddenComponent } from './views/forbidden/forbidden.component';
import { APP_BASE_HREF } from '@angular/common'; import { APP_BASE_HREF } from '@angular/common';
import {SimpleTriggerConfigComponent} from './components/simple-trigger-config';
import JobService from './services/job.service'; import JobService from './services/job.service';
import {GenericErrorComponent} from './views/error/genericError.component'; import {GenericErrorComponent} from './views/error/genericError.component';
import {LogsRxWebsocketService} from './services/logs.rx-websocket.service';
export function initUserFactory(userService: UserService) { export function initUserFactory(userService: UserService) {
return () => userService.fetchLoggedUser(); return () => userService.fetchLoggedUser();
@@ -144,8 +143,7 @@ export function jwtOptionsFactory(apiService: ApiService) {
SchedulerService, SchedulerService,
JobService, JobService,
TriggerService, TriggerService,
ProgressWebsocketService, ProgressRxWebsocketService,
// LogsWebsocketService,
LogsRxWebsocketService, LogsRxWebsocketService,
AuthService, AuthService,
ApiService, ApiService,

View File

@@ -26,7 +26,6 @@ export class LogsPanelComponent implements OnInit, OnDestroy {
private selectedTriggerKey: TriggerKey; private selectedTriggerKey: TriggerKey;
constructor( constructor(
// private logsWebsocketService: LogsWebsocketService,
private logsRxWebsocketService: LogsRxWebsocketService, private logsRxWebsocketService: LogsRxWebsocketService,
private apiService: ApiService private apiService: ApiService
) { ) {
@@ -41,21 +40,6 @@ export class LogsPanelComponent implements OnInit, OnDestroy {
} }
ngOnInit() { ngOnInit() {
// const obs = this.logsWebsocketService.getObservable()
// obs.subscribe({
// 'next': this.onNewLogMsg,
// 'error': (err) => {
// console.log(err)
// }
// });
// this.topicSubscription = this.logsRxWebsocketService.watch('/topic/logs')
// .pipe(map(msg => JSON.parse(msg.body)))
// .subscribe(this._showNewLog, (err) => {
// console.log(err);
// // TODO in case of 401
// // this.apiService.get('/quartz-manager/session/refresh');
// });
} }
private _subscribeToTheTopic = (triggerKey: TriggerKey) => { private _subscribeToTheTopic = (triggerKey: TriggerKey) => {
@@ -79,14 +63,6 @@ export class LogsPanelComponent implements OnInit, OnDestroy {
this.topicSubscription = null; this.topicSubscription = null;
} }
// onNewLogMsg = (receivedMsg) => {
// if (receivedMsg.body.type === 'SUCCESS') {
// this._showNewLog(receivedMsg.body.message);
// } else if (receivedMsg.body.type === 'ERROR') {
// this._refreshSession();
// } // if websocket has been closed for session expiration, try to refresh it
// };
_showNewLog = (logRecord) => { _showNewLog = (logRecord) => {
if (this.logs.length > this.MAX_LOGS) { if (this.logs.length > this.MAX_LOGS) {
this.logs.pop(); this.logs.pop();

View File

@@ -1,84 +1,61 @@
import { Component, OnInit, Input, Output, EventEmitter } from '@angular/core' import {Component, Input, OnDestroy, OnInit} from '@angular/core'
import {ProgressWebsocketService, QuartzManagerWebsocketMessage} from '../../services';
import { Observable } from 'rxjs';
import TriggerFiredBundle from '../../model/trigger-fired-bundle.model'; import TriggerFiredBundle from '../../model/trigger-fired-bundle.model';
// import {Message} from '@stomp/stompjs'; import {TriggerKey} from '../../model/triggerKey.model';
import {ProgressRxWebsocketService} from '../../services/progress.rx-websocket.service';
// import { Subscription } from 'rxjs/Subscription'; import {map} from 'rxjs/operators';
// import {StompService} from '@stomp/ng2-stompjs';
// import { QueueingSubject } from 'queueing-subject'
// import websocketConnect from 'rxjs-websockets'
// import 'rxjs/add/operator/share'
// import {ServerSocket} from '../../services/qz.socket.service'
@Component({ @Component({
selector: 'progress-panel', selector: 'progress-panel',
templateUrl: './progress-panel.component.html', templateUrl: './progress-panel.component.html',
styleUrls: ['./progress-panel.component.scss'] styleUrls: ['./progress-panel.component.scss']
}) })
export class ProgressPanelComponent implements OnInit { export class ProgressPanelComponent implements OnInit, OnDestroy {
progress: TriggerFiredBundle = new TriggerFiredBundle(); progress: TriggerFiredBundle = new TriggerFiredBundle();
percentageStr: string; percentageStr: string;
// // Stream of messages topicSubscription;
// private subscription: Subscription; private selectedTriggerKey: TriggerKey;
// public messages: Observable<Message>;
// // Subscription status
// public subscribed: boolean;
// // Array of historic message (bodies)
// public mq: Array<string> = [];
constructor( constructor(
private progressWebsocketService: ProgressWebsocketService, private progressRxWebsocketService: ProgressRxWebsocketService
// private _stompService: StompService,
// private serverSocket : ServerSocket
) { } ) { }
onNewProgressMsg = (receivedMsg: QuartzManagerWebsocketMessage) => { @Input()
if (receivedMsg.type === 'SUCCESS') { set triggerKey(triggerKey: TriggerKey) {
const newStatus = receivedMsg.message; this.selectedTriggerKey = {...triggerKey} as TriggerKey;
this.progress = newStatus; if (this.selectedTriggerKey && this.selectedTriggerKey.name) {
this.percentageStr = this.progress.percentage + '%'; this._subscribeToTheTopic(this.selectedTriggerKey);
} }
} }
private _subscribeToTheTopic = (triggerKey: TriggerKey) => {
if (this.topicSubscription) {
this.topicSubscription.unsubscribe();
}
this.topicSubscription = this.progressRxWebsocketService.watch(`/topic/progress/${triggerKey.name}`)
.pipe(map(msg => JSON.parse(msg.body)))
.subscribe(this.onNewProgressMsg, (err) => {
console.log(err);
// TODO in case of 401
// this.apiService.get('/quartz-manager/session/refresh');
});
};
onNewProgressMsg = (receivedMsg) => {
this.progress = receivedMsg;
this.percentageStr = this.progress.percentage + '%';
}
ngOnInit() { ngOnInit() {
const obs = this.progressWebsocketService.getObservable()
obs.subscribe({
'next' : this.onNewProgressMsg,
'error' : (err) => {console.log(err)}
});
// this.subscribed = false;
// this.subscribe();
// this.serverSocket.connect()
// this.socketSubscription = this.serverSocket.messages.subscribe((message: string) => {
// console.log('received message from server: ', message)
// })
} }
// public subscribe() { ngOnDestroy() {
// if (this.subscribed) { if (this.topicSubscription) {
// return; this.topicSubscription.unsubscribe();
// } }
this.topicSubscription.unsubscribe();
// // Stream of messages this.topicSubscription = null;
// this.messages = this._stompService.subscribe('/topic/progress'); }
// // Subscribe a function to be run on_next message
// this.subscription = this.messages.subscribe(this.on_next);
// this.subscribed = true;
// }
// public on_next = (message: Message) => {
// this.mq.push(message.body + '\n');
// console.log(message);
// }
} }

View File

@@ -3,9 +3,8 @@ export * from './user.service';
export * from './config.service'; export * from './config.service';
export * from './auth.service'; export * from './auth.service';
export * from './scheduler.service'; export * from './scheduler.service';
export * from './websocket.service'; export * from './progress.rx-websocket.service';
export * from './progress.websocket.service'; export * from './logs.rx-websocket.service';
export * from './logs.websocket.service';
export * from './trigger.service' export * from './trigger.service'
export * from './job.service' export * from './job.service'

View File

@@ -1,12 +0,0 @@
import {Injectable} from '@angular/core';
import {WebsocketService, ApiService, getBaseUrl, CONTEXT_PATH} from '.';
import {SocketOption} from '../model/SocketOption.model';
@Injectable()
export class LogsWebsocketService extends WebsocketService {
constructor(private apiService: ApiService) {
super(new SocketOption(getBaseUrl() + `${CONTEXT_PATH}/logs`, '/topic/logs', apiService.getToken))
}
}

View File

@@ -0,0 +1,23 @@
import { Injectable } from '@angular/core';
import {RxStompService} from './rx-stomp.service';
import {ApiService} from './api.service';
import SockJS from 'sockjs-client';
import {CONTEXT_PATH, getBaseUrl} from './config.service';
@Injectable({
providedIn: 'root'
})
export class ProgressRxWebsocketService extends RxStompService {
constructor(private apiService: ApiService) {
super({
webSocketFactory: () => new SockJS(`${getBaseUrl()}${CONTEXT_PATH}/progress?access_token=${this.apiService.getToken()}`),
heartbeatIncoming: 0,
heartbeatOutgoing: 20000,
reconnectDelay: 200,
debug: (msg: string): void => {
console.log(new Date(), msg);
}
});
}
}

View File

@@ -1,12 +0,0 @@
import {Injectable} from '@angular/core';
import {WebsocketService, ApiService, getBaseUrl, CONTEXT_PATH} from '.';
import {SocketOption} from '../model/SocketOption.model';
@Injectable()
export class ProgressWebsocketService extends WebsocketService {
constructor(private apiService: ApiService) {
super(new SocketOption(getBaseUrl() + `${CONTEXT_PATH}/progress`, '/topic/progress', apiService.getToken))
}
}

View File

@@ -1,136 +0,0 @@
import {Observable, Subscriber} from 'rxjs';
import {SocketEndpoint} from '../model/SocketEndpoint.model'
import Stomp from 'stompjs';
import SockJS from 'sockjs-client';
import {SocketOption} from '../model/SocketOption.model';
interface WebsocketSubscriber {
index: number,
observer: Subscriber<any>
}
export interface QuartzManagerWebsocketMessage {
type: string;
message: any;
headers: any;
self: boolean;
}
export class WebsocketService {
_options: SocketOption;
_socket: SocketEndpoint = new SocketEndpoint();
observableStompConnection: Observable<any>;
subscribers: Array<WebsocketSubscriber> = [];
subscriberIndex = 0;
_messageIds: Array<any> = [];
reconnectionPromise: any;
constructor(options: SocketOption) {
this._options = options
this.createObservableSocket();
this.connect();
}
getOptions = () => {
}
private createObservableSocket = () => {
this.observableStompConnection = new Observable((observer) => {
const subscriberIndex = this.subscriberIndex++;
this.addToSubscribers({index: subscriberIndex, observer});
return () => this.removeFromSubscribers(subscriberIndex);
});
}
private addToSubscribers = (subscriber) => {
this.subscribers.push(subscriber);
}
private removeFromSubscribers = (index) => {
this.subscribers = this.subscribers.filter(subscriber => subscriber.index !== index);
}
getObservable = () => {
return this.observableStompConnection;
};
getMessage = function (data): QuartzManagerWebsocketMessage {
const out: QuartzManagerWebsocketMessage = <QuartzManagerWebsocketMessage>{};
out.type = 'SUCCESS';
out.message = JSON.parse(data.body);
out.headers = {};
out.headers.messageId = data.headers['message-id'];
const messageIdIndex = this._messageIds.indexOf(out.headers.messageId);
if (messageIdIndex > -1) {
out.self = true;
this._messageIds = this._messageIds.splice(messageIdIndex, 1);
}
return out;
};
_socketListener = (frame) => {
console.log(`Connected to ${this._options.socketUrl}: ${frame}`);
this._socket.stomp.subscribe(
this._options.topicName,
data => this.subscribers.forEach(subscriber => subscriber.observer.next(this.getMessage(data)))
);
}
_onSocketError = (errorMsg) => {
const out: any = {};
out.type = 'ERROR';
out.message = errorMsg;
this.subscribers.forEach(subscriber => subscriber.observer.error(out));
this.scheduleReconnection();
}
scheduleReconnection = () => {
this.reconnectionPromise = setTimeout(() => {
console.log(`Socket reconnecting to ${this._options.socketUrl}... (if it fails, next attempt in ${this._options.reconnectionTimeout} msec)`);
this.connect();
}, this._options.reconnectionTimeout);
}
reconnectNow = function () {
this._socket.stomp.disconnect();
if (this.reconnectionPromise && this.reconnectionPromise.cancel) {
this.reconnectionPromise.cancel();
}
this.connect();
};
send = (message) => {
const id = Math.floor(Math.random() * 1000000);
this._socket.stomp.send(this._options.brokerName, {
priority: 9
}, JSON.stringify({
message: message,
id: id
}));
this._messageIds.push(id);
};
connect = () => {
const headers = {};
let socketUrl = this._options.socketUrl;
if (this._options.getAccessToken()) {
socketUrl += `?access_token=${this._options.getAccessToken()}`;
}
this._socket.client = new SockJS(socketUrl);
this._socket.stomp = Stomp.over(this._socket.client);
this._socket.stomp.connect(headers, this._socketListener, this._onSocketError);
this._socket.stomp.onclose = this.scheduleReconnection;
}
}

View File

@@ -29,7 +29,10 @@
<div fxFlex="1 1 auto" style="margin-left: 20px;"> <div fxFlex="1 1 auto" style="margin-left: 20px;">
<div fxFlex="1 1 auto" fxLayout="column" fxLayoutAlign="start stretch" fxLayoutGap="6px"> <div fxFlex="1 1 auto" fxLayout="column" fxLayoutAlign="start stretch" fxLayoutGap="6px">
<progress-panel></progress-panel> <progress-panel
[triggerKey]=selectedTriggerKey
>
</progress-panel>
<logs-panel fxFlex="1 1 auto" fxFill <logs-panel fxFlex="1 1 auto" fxFill
[triggerKey]=selectedTriggerKey [triggerKey]=selectedTriggerKey
> >

View File

@@ -21,7 +21,7 @@ public class WebSocketProgressNotifier implements WebhookSender<TriggerFiredBund
@Override @Override
public void send(String triggerName, TriggerFiredBundleDTO triggerFiredBundleDTO) { public void send(String triggerName, TriggerFiredBundleDTO triggerFiredBundleDTO) {
messagingTemplate.convertAndSend(TOPIC_PROGRESS, triggerFiredBundleDTO); messagingTemplate.convertAndSend(TOPIC_PROGRESS + "/" + triggerName, triggerFiredBundleDTO);
} }
public static TriggerFiredBundleDTO buildTriggerFiredBundle(JobExecutionContext jobExecutionContext) { public static TriggerFiredBundleDTO buildTriggerFiredBundle(JobExecutionContext jobExecutionContext) {