Node errorhandling (#170)
* added code * removed cdk code * added code examples * added ts files * added code * updated code * error handling code * Update nodejs/errorhandling/js/index.js Co-authored-by: Pratik Das <pratikd2001@gmail.com> Co-authored-by: Tom Hombergs <tom.hombergs@gmail.com>
This commit is contained in:
@@ -28,6 +28,63 @@ public class ErrorCounter {
|
||||
|
||||
DataStream<String> inputStream = createSource(env);
|
||||
|
||||
DataStream<LogRecord> logRecords = mapStringToLogRecord(inputStream);
|
||||
|
||||
DataStream<LogRecord> errorRecords = filterErrorRecords(logRecords);
|
||||
|
||||
DataStream<LogRecord> keyedStream = assignIPasKey(errorRecords);
|
||||
|
||||
|
||||
DataStream<String> keyedStreamAsText = mapLogRecordToString(keyedStream);
|
||||
|
||||
//TODO Uncomment this code for deploying to Kinesis Data Analytics
|
||||
|
||||
// keyedStream.addSink(createSink());
|
||||
|
||||
keyedStreamAsText.print();
|
||||
|
||||
env.execute("Error alerts");
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static DataStream<String> mapLogRecordToString(DataStream<LogRecord> keyedStream) {
|
||||
DataStream<String> keyedStreamAsText = keyedStream.flatMap(new FlatMapFunction<LogRecord, String>() {
|
||||
|
||||
@Override
|
||||
public void flatMap(LogRecord value, Collector<String> out) throws Exception {
|
||||
out.collect(value.getUrl()+"::" + value.getHttpStatus());
|
||||
}
|
||||
});
|
||||
return keyedStreamAsText;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static DataStream<LogRecord> assignIPasKey(DataStream<LogRecord> errorRecords) {
|
||||
DataStream<LogRecord> keyedStream = errorRecords.keyBy(value -> value.getIp());
|
||||
return keyedStream;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static DataStream<LogRecord> filterErrorRecords(DataStream<LogRecord> logRecords) {
|
||||
DataStream<LogRecord> errorRecords = logRecords.filter(new FilterFunction<LogRecord>() {
|
||||
|
||||
@Override
|
||||
public boolean filter(LogRecord value) throws Exception {
|
||||
boolean matched = !value.getHttpStatus().equalsIgnoreCase("200");
|
||||
|
||||
return matched;
|
||||
}
|
||||
});
|
||||
return errorRecords;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static DataStream<LogRecord> mapStringToLogRecord(DataStream<String> inputStream) {
|
||||
DataStream<LogRecord> logRecords = inputStream.flatMap(new FlatMapFunction<String, LogRecord>() {
|
||||
|
||||
@Override
|
||||
@@ -45,34 +102,10 @@ public class ErrorCounter {
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
DataStream<LogRecord> errorRecords = logRecords.filter(new FilterFunction<LogRecord>() {
|
||||
|
||||
@Override
|
||||
public boolean filter(LogRecord value) throws Exception {
|
||||
boolean matched = !value.getHttpStatus().equalsIgnoreCase("200");
|
||||
|
||||
return matched;
|
||||
}
|
||||
});
|
||||
|
||||
DataStream<String> keyedStream = errorRecords.keyBy(value -> value.getIp()).flatMap(new FlatMapFunction<LogRecord, String>() {
|
||||
|
||||
@Override
|
||||
public void flatMap(LogRecord value, Collector<String> out) throws Exception {
|
||||
out.collect(value.getUrl()+"::" + value.getHttpStatus());
|
||||
}
|
||||
});
|
||||
|
||||
//TODO Uncomment this code for deploying to Kinesis Data Analytics
|
||||
|
||||
// keyedStream.addSink(createSink());
|
||||
|
||||
keyedStream.print();
|
||||
|
||||
env.execute("Error alerts");
|
||||
|
||||
return logRecords;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*private static void createSink(final StreamExecutionEnvironment env, DataStream<LogRecord> input) {
|
||||
input.print();
|
||||
|
||||
33
aws/kinesis/src/main/resources/lambda_function/index.js
Normal file
33
aws/kinesis/src/main/resources/lambda_function/index.js
Normal file
@@ -0,0 +1,33 @@
|
||||
console.log('Loading function');
|
||||
|
||||
const validateRecord = (recordElement)=>{
|
||||
// record is considered valid if contains status field
|
||||
return recordElement.includes("status")
|
||||
}
|
||||
|
||||
exports.handler = async (event, context) => {
|
||||
/* Process the list of records and transform them */
|
||||
const output = event.records.map((record)=>{
|
||||
const decodedData = Buffer.from(record.data, "base64").toString("utf-8")
|
||||
let isValidRecord = validateRecord(decodedData)
|
||||
|
||||
if(isValidRecord){
|
||||
let parsedRecord = JSON.parse(decodedData)
|
||||
// read fields from parsed JSON for some more processing
|
||||
const outputRecord = `status::${parsedRecord.status}`
|
||||
return {
|
||||
recordId: record.recordId,
|
||||
result: 'Ok',
|
||||
// payload is encoded back to base64 before returning the result
|
||||
data: Buffer.from(outputRecord, "utf-8").toString("base64")
|
||||
}
|
||||
|
||||
}else{
|
||||
return {
|
||||
recordId: record.recordId,
|
||||
result: 'dropped',
|
||||
data: record.data // payload is kept intact,
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
3
nodejs/errorhandling/README.md
Normal file
3
nodejs/errorhandling/README.md
Normal file
@@ -0,0 +1,3 @@
|
||||
# Examples for [Error Handling in Express](guide-to-error-handling-in-express)
|
||||
|
||||
This repository contains the source code of the article's examples.
|
||||
94
nodejs/errorhandling/js/index.js
Normal file
94
nodejs/errorhandling/js/index.js
Normal file
@@ -0,0 +1,94 @@
|
||||
const express = require('express')
|
||||
const axios = require("axios")
|
||||
// const morgan = require('morgan')
|
||||
|
||||
const app = express()
|
||||
|
||||
|
||||
const requestLogger = (request, response, next) => {
|
||||
console.log(`${request.method} url:: ${request.url}`);
|
||||
next()
|
||||
}
|
||||
|
||||
app.use(express.static('images'))
|
||||
app.use(express.static('htmls'))
|
||||
|
||||
app.use('/products', express.json({ limit: 100 }))
|
||||
|
||||
// Error handling Middleware functions
|
||||
const errorLogger = (error, request, response, next) => {
|
||||
console.log( `error ${error.message}`)
|
||||
next(error) // calling next middleware
|
||||
}
|
||||
|
||||
const errorResponder = (error, request, response, next) => {
|
||||
response.header("Content-Type", 'application/json')
|
||||
|
||||
const status = error.status || 400
|
||||
response.status(status).send(error.message)
|
||||
}
|
||||
const invalidPathHandler = (request, response, next) => {
|
||||
response.status(400)
|
||||
response.send('invalid path')
|
||||
}
|
||||
|
||||
|
||||
// handle post request for path /products
|
||||
app.post('/products', (request, response) => {
|
||||
const products = []
|
||||
|
||||
const name = request.body.name
|
||||
|
||||
const brand = request.body.brand
|
||||
|
||||
const category = request.body.category
|
||||
|
||||
if(name == null){
|
||||
res.status(400).json({ message: "Mandatory field name is missing. " })
|
||||
}else{
|
||||
console.log(name + " " + brand)
|
||||
|
||||
products.push({name: request.body.name, brand: request.body.brand, price: request.body.price})
|
||||
|
||||
const productCreationResponse = {productID: "12345", result: "success"}
|
||||
response.json(productCreationResponse)
|
||||
}
|
||||
})
|
||||
|
||||
app.get('/products', async (request, response, next)=>{
|
||||
try{
|
||||
const apiResponse = await axios.get("http://localhost:3001/products")
|
||||
|
||||
const jsonResponse = apiResponse.data
|
||||
console.log("response "+jsonResponse)
|
||||
|
||||
response.send(jsonResponse)
|
||||
}catch(error){
|
||||
next(error)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
app.get('/product', (request, response, next)=>{
|
||||
|
||||
axios.get("http://localhost:3001/product")
|
||||
.then(response=>response.json)
|
||||
.then(jsonresponse=>response.send(jsonresponse))
|
||||
.catch(next)
|
||||
})
|
||||
|
||||
app.get('/productswitherror', (request, response) => {
|
||||
let error = new Error(`processing error in request at ${request.url}`)
|
||||
error.statusCode = 400
|
||||
throw error
|
||||
})
|
||||
|
||||
app.use(errorLogger)
|
||||
app.use(errorResponder)
|
||||
app.use(invalidPathHandler)
|
||||
|
||||
const port = 3000
|
||||
|
||||
app.listen(3000,
|
||||
() => console.log(`Server listening on port ${port}.`));
|
||||
|
||||
33
nodejs/errorhandling/js/lambda.js
Normal file
33
nodejs/errorhandling/js/lambda.js
Normal file
@@ -0,0 +1,33 @@
|
||||
console.log('Loading function');
|
||||
|
||||
const validateRecord = (recordElement)=>{
|
||||
// record is considered valid if contains status field
|
||||
return recordElement.includes("status")
|
||||
}
|
||||
|
||||
exports.handler = async (event, context) => {
|
||||
/* Process the list of records and transform them */
|
||||
const output = event.records.map((record)=>{
|
||||
const decodedData = Buffer.from(record.data, "base64").toString("utf-8")
|
||||
let isValidRecord = validateRecord(decodedData)
|
||||
|
||||
if(isValidRecord){
|
||||
let parsedRecord = JSON.parse(decodedData)
|
||||
// read fields from parsed JSON for some more processing
|
||||
const outputRecord = `status::${parsedRecord.status}`
|
||||
return {
|
||||
recordId: record.recordId,
|
||||
result: 'Ok',
|
||||
// payload is encoded back to base64 before returning the result
|
||||
data: Buffer.from(outputRecord, "utf-8").toString("base64")
|
||||
}
|
||||
|
||||
}else{
|
||||
return {
|
||||
recordId: record.recordId,
|
||||
result: 'dropped',
|
||||
data: record.data // payload is kept intact,
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
20
nodejs/errorhandling/js/server.js
Normal file
20
nodejs/errorhandling/js/server.js
Normal file
@@ -0,0 +1,20 @@
|
||||
const express = require('express')
|
||||
// const morgan = require('morgan')
|
||||
|
||||
const app = express()
|
||||
|
||||
const port = 3001
|
||||
|
||||
const products = [
|
||||
{name:"Television", price: 24.56, currency: "USG", brand: "samsung"},
|
||||
{name:"Washing Machine", price: 67.56, currency: "EUR", brand: "LG"}
|
||||
]
|
||||
|
||||
app.get('/products', (request, response)=>{
|
||||
|
||||
response.json(products)
|
||||
})
|
||||
|
||||
app.listen(port,
|
||||
() => console.log(`Server listening on port ${port}.`))
|
||||
|
||||
23
nodejs/errorhandling/package.json
Normal file
23
nodejs/errorhandling/package.json
Normal file
@@ -0,0 +1,23 @@
|
||||
{
|
||||
"name": "storefront",
|
||||
"version": "1.0.0",
|
||||
"description": "",
|
||||
"main": "js/index.js",
|
||||
"scripts": {
|
||||
"test": "echo \"Error: no test specified\" && exit 1"
|
||||
},
|
||||
"keywords": [],
|
||||
"author": "",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"axios": "^0.26.1",
|
||||
"express": "^4.17.3",
|
||||
"node-fetch": "^3.2.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/express": "^4.17.13",
|
||||
"@types/node": "^17.0.23",
|
||||
"ts-node": "^10.7.0",
|
||||
"typescript": "^4.6.3"
|
||||
}
|
||||
}
|
||||
126
nodejs/errorhandling/ts/app.ts
Normal file
126
nodejs/errorhandling/ts/app.ts
Normal file
@@ -0,0 +1,126 @@
|
||||
import express, { Request, Response, NextFunction } from 'express'
|
||||
import axios from 'axios'
|
||||
|
||||
const app = express()
|
||||
const port:number = 3000
|
||||
|
||||
|
||||
interface Product {
|
||||
|
||||
name: string
|
||||
price: number
|
||||
brand: string
|
||||
category?: string
|
||||
}
|
||||
|
||||
interface ProductCreationResponse {
|
||||
productID: string
|
||||
result: string
|
||||
}
|
||||
|
||||
|
||||
class AppError extends Error{
|
||||
statusCode: number;
|
||||
|
||||
constructor(statusCode: number, message: string) {
|
||||
super(message);
|
||||
|
||||
Object.setPrototypeOf(this, new.target.prototype);
|
||||
this.name = Error.name;
|
||||
this.statusCode = statusCode;
|
||||
Error.captureStackTrace(this);
|
||||
}
|
||||
}
|
||||
|
||||
const requestLogger = (request: Request, response: Response, next: NextFunction) => {
|
||||
console.log(`${request.method} url:: ${request.url}`);
|
||||
next()
|
||||
}
|
||||
|
||||
app.use(express.static('images'))
|
||||
app.use(express.static('htmls'))
|
||||
app.use(requestLogger)
|
||||
|
||||
app.use('/products', express.json({ limit: 100 }))
|
||||
|
||||
// Error handling Middleware functions
|
||||
const errorLogger = (error: Error, request: Request, response: Response, next: NextFunction) => {
|
||||
console.log( `error ${error.message}`)
|
||||
next(error) // calling next middleware
|
||||
}
|
||||
|
||||
const errorResponder = (error: AppError, request: Request, response: Response, next: NextFunction) => {
|
||||
response.header("Content-Type", 'application/json')
|
||||
|
||||
const status = error.statusCode || 400
|
||||
response.status(status).send(error.message)
|
||||
}
|
||||
|
||||
const invalidPathHandler = (request: Request, response: Response, next: NextFunction) => {
|
||||
response.status(400)
|
||||
response.send('invalid path')
|
||||
}
|
||||
|
||||
|
||||
// handle post request for path /products
|
||||
app.post('/products', (request: Request, response: Response) => {
|
||||
const products = []
|
||||
|
||||
const name = request.body.name
|
||||
|
||||
const brand = request.body.brand
|
||||
|
||||
const category = request.body.category
|
||||
|
||||
if(name == null){
|
||||
response.status(500).json({ message: "Mandatory field name is missing. " })
|
||||
}else{
|
||||
console.log(name + " " + brand)
|
||||
|
||||
products.push({name: request.body.name, brand: request.body.brand, price: request.body.price})
|
||||
|
||||
const productCreationResponse = {productID: "12345", result: "success"}
|
||||
response.json(productCreationResponse)
|
||||
}
|
||||
})
|
||||
|
||||
app.get('/products', async (request: Request, response: Response, next: NextFunction)=>{
|
||||
try{
|
||||
const apiResponse = await axios.get("http://localhost:3001/products")
|
||||
|
||||
const jsonResponse = apiResponse.data
|
||||
console.log("response "+jsonResponse)
|
||||
|
||||
response.send(jsonResponse)
|
||||
}catch(error){
|
||||
next(error)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
app.get('/product', (request: Request, response: Response, next: NextFunction)=>{
|
||||
|
||||
axios.get("http://localhost:3001/product")
|
||||
.then(jsonresponse=>response.send(jsonresponse))
|
||||
.catch(next)
|
||||
})
|
||||
|
||||
app.get('/productswitherror', (request, response) => {
|
||||
let error:AppError = new AppError(400, `processing error in request at ${request.url}`)
|
||||
error.statusCode = 400
|
||||
throw error
|
||||
})
|
||||
|
||||
app.get('/productswitherror', (request: Request, response: Response) => {
|
||||
let error: AppError = new AppError(400, `processing error in request at ${request.url}`)
|
||||
|
||||
throw error
|
||||
})
|
||||
|
||||
app.use(errorLogger)
|
||||
app.use(errorResponder)
|
||||
app.use(invalidPathHandler)
|
||||
|
||||
app.listen(port, () => {
|
||||
console.log(`Server listening at port ${port}.`)
|
||||
})
|
||||
28
nodejs/errorhandling/ts/server.ts
Normal file
28
nodejs/errorhandling/ts/server.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
import express, { Request, Response, NextFunction } from 'express'
|
||||
|
||||
const app = express()
|
||||
|
||||
const port:number = 3001
|
||||
|
||||
interface Product {
|
||||
|
||||
name: string
|
||||
price: number
|
||||
currency: string
|
||||
brand: string
|
||||
category?: string
|
||||
}
|
||||
|
||||
const products: Product[] = [
|
||||
{name:"Television", price: 24.56, currency: "USG", brand: "samsung"},
|
||||
{name:"Washing Machine", price: 67.56, currency: "EUR", brand: "LG"}
|
||||
]
|
||||
|
||||
app.get('/products', (request: Request, response: Response)=>{
|
||||
|
||||
response.json(products)
|
||||
})
|
||||
|
||||
app.listen(port,
|
||||
() => console.log(`Server listening on port ${port}.`))
|
||||
|
||||
8
nodejs/errorhandling/tsconfig.json
Normal file
8
nodejs/errorhandling/tsconfig.json
Normal file
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"module": "commonjs",
|
||||
"target": "es6",
|
||||
"rootDir": "./ts",
|
||||
"esModuleInterop": true
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user