diff --git a/aws/kinesis/src/main/java/io/pratik/flink/ErrorCounter.java b/aws/kinesis/src/main/java/io/pratik/flink/ErrorCounter.java index 9415c4b..b2fe14d 100644 --- a/aws/kinesis/src/main/java/io/pratik/flink/ErrorCounter.java +++ b/aws/kinesis/src/main/java/io/pratik/flink/ErrorCounter.java @@ -28,6 +28,63 @@ public class ErrorCounter { DataStream inputStream = createSource(env); + DataStream logRecords = mapStringToLogRecord(inputStream); + + DataStream errorRecords = filterErrorRecords(logRecords); + + DataStream keyedStream = assignIPasKey(errorRecords); + + + DataStream keyedStreamAsText = mapLogRecordToString(keyedStream); + + //TODO Uncomment this code for deploying to Kinesis Data Analytics + + // keyedStream.addSink(createSink()); + + keyedStreamAsText.print(); + + env.execute("Error alerts"); + + } + + + + private static DataStream mapLogRecordToString(DataStream keyedStream) { + DataStream keyedStreamAsText = keyedStream.flatMap(new FlatMapFunction() { + + @Override + public void flatMap(LogRecord value, Collector out) throws Exception { + out.collect(value.getUrl()+"::" + value.getHttpStatus()); + } + }); + return keyedStreamAsText; + } + + + + private static DataStream assignIPasKey(DataStream errorRecords) { + DataStream keyedStream = errorRecords.keyBy(value -> value.getIp()); + return keyedStream; + } + + + + private static DataStream filterErrorRecords(DataStream logRecords) { + DataStream errorRecords = logRecords.filter(new FilterFunction() { + + @Override + public boolean filter(LogRecord value) throws Exception { + boolean matched = !value.getHttpStatus().equalsIgnoreCase("200"); + + return matched; + } + }); + return errorRecords; + } + + + + private static DataStream mapStringToLogRecord(DataStream inputStream) { DataStream logRecords = inputStream.flatMap(new FlatMapFunction() { @Override @@ -45,34 +102,10 @@ public class ErrorCounter { } }); - - DataStream errorRecords = logRecords.filter(new FilterFunction() { - - @Override - public boolean filter(LogRecord value) throws Exception { - boolean matched = !value.getHttpStatus().equalsIgnoreCase("200"); - - return matched; - } - }); - - DataStream keyedStream = errorRecords.keyBy(value -> value.getIp()).flatMap(new FlatMapFunction() { - - @Override - public void flatMap(LogRecord value, Collector out) throws Exception { - out.collect(value.getUrl()+"::" + value.getHttpStatus()); - } - }); - - //TODO Uncomment this code for deploying to Kinesis Data Analytics - - // keyedStream.addSink(createSink()); - - keyedStream.print(); - - env.execute("Error alerts"); - + return logRecords; } + + /*private static void createSink(final StreamExecutionEnvironment env, DataStream input) { input.print(); diff --git a/aws/kinesis/src/main/resources/lambda_function/index.js b/aws/kinesis/src/main/resources/lambda_function/index.js new file mode 100644 index 0000000..fde5c8c --- /dev/null +++ b/aws/kinesis/src/main/resources/lambda_function/index.js @@ -0,0 +1,33 @@ +console.log('Loading function'); + +const validateRecord = (recordElement)=>{ + // record is considered valid if contains status field + return recordElement.includes("status") +} + +exports.handler = async (event, context) => { + /* Process the list of records and transform them */ + const output = event.records.map((record)=>{ + const decodedData = Buffer.from(record.data, "base64").toString("utf-8") + let isValidRecord = validateRecord(decodedData) + + if(isValidRecord){ + let parsedRecord = JSON.parse(decodedData) + // read fields from parsed JSON for some more processing + const outputRecord = `status::${parsedRecord.status}` + return { + recordId: record.recordId, + result: 'Ok', + // payload is encoded back to base64 before returning the result + data: Buffer.from(outputRecord, "utf-8").toString("base64") + } + + }else{ + return { + recordId: record.recordId, + result: 'dropped', + data: record.data // payload is kept intact, + } + } + }) +}; diff --git a/nodejs/errorhandling/README.md b/nodejs/errorhandling/README.md new file mode 100644 index 0000000..60bdcd7 --- /dev/null +++ b/nodejs/errorhandling/README.md @@ -0,0 +1,3 @@ +# Examples for [Error Handling in Express](guide-to-error-handling-in-express) + +This repository contains the source code of the article's examples. \ No newline at end of file diff --git a/nodejs/errorhandling/js/index.js b/nodejs/errorhandling/js/index.js new file mode 100644 index 0000000..6c8bd43 --- /dev/null +++ b/nodejs/errorhandling/js/index.js @@ -0,0 +1,94 @@ +const express = require('express') +const axios = require("axios") +// const morgan = require('morgan') + +const app = express() + + +const requestLogger = (request, response, next) => { + console.log(`${request.method} url:: ${request.url}`); + next() +} + +app.use(express.static('images')) +app.use(express.static('htmls')) + +app.use('/products', express.json({ limit: 100 })) + +// Error handling Middleware functions +const errorLogger = (error, request, response, next) => { + console.log( `error ${error.message}`) + next(error) // calling next middleware +} + +const errorResponder = (error, request, response, next) => { +response.header("Content-Type", 'application/json') + +const status = error.status || 400 +response.status(status).send(error.message) +} +const invalidPathHandler = (request, response, next) => { +response.status(400) +response.send('invalid path') +} + + +// handle post request for path /products +app.post('/products', (request, response) => { + const products = [] + + const name = request.body.name + + const brand = request.body.brand + + const category = request.body.category + + if(name == null){ + res.status(400).json({ message: "Mandatory field name is missing. " }) + }else{ + console.log(name + " " + brand) + + products.push({name: request.body.name, brand: request.body.brand, price: request.body.price}) + + const productCreationResponse = {productID: "12345", result: "success"} + response.json(productCreationResponse) + } +}) + +app.get('/products', async (request, response, next)=>{ + try{ + const apiResponse = await axios.get("http://localhost:3001/products") + + const jsonResponse = apiResponse.data + console.log("response "+jsonResponse) + + response.send(jsonResponse) + }catch(error){ + next(error) + } + +}) + +app.get('/product', (request, response, next)=>{ + + axios.get("http://localhost:3001/product") + .then(response=>response.json) + .then(jsonresponse=>response.send(jsonresponse)) + .catch(next) +}) + +app.get('/productswitherror', (request, response) => { + let error = new Error(`processing error in request at ${request.url}`) + error.statusCode = 400 + throw error +}) + +app.use(errorLogger) +app.use(errorResponder) +app.use(invalidPathHandler) + +const port = 3000 + +app.listen(3000, + () => console.log(`Server listening on port ${port}.`)); + \ No newline at end of file diff --git a/nodejs/errorhandling/js/lambda.js b/nodejs/errorhandling/js/lambda.js new file mode 100644 index 0000000..573426a --- /dev/null +++ b/nodejs/errorhandling/js/lambda.js @@ -0,0 +1,33 @@ +console.log('Loading function'); + +const validateRecord = (recordElement)=>{ + // record is considered valid if contains status field + return recordElement.includes("status") +} + +exports.handler = async (event, context) => { + /* Process the list of records and transform them */ + const output = event.records.map((record)=>{ + const decodedData = Buffer.from(record.data, "base64").toString("utf-8") + let isValidRecord = validateRecord(decodedData) + + if(isValidRecord){ + let parsedRecord = JSON.parse(decodedData) + // read fields from parsed JSON for some more processing + const outputRecord = `status::${parsedRecord.status}` + return { + recordId: record.recordId, + result: 'Ok', + // payload is encoded back to base64 before returning the result + data: Buffer.from(outputRecord, "utf-8").toString("base64") + } + + }else{ + return { + recordId: record.recordId, + result: 'dropped', + data: record.data // payload is kept intact, + } + } + }) +}; \ No newline at end of file diff --git a/nodejs/errorhandling/js/server.js b/nodejs/errorhandling/js/server.js new file mode 100644 index 0000000..7f20fee --- /dev/null +++ b/nodejs/errorhandling/js/server.js @@ -0,0 +1,20 @@ +const express = require('express') +// const morgan = require('morgan') + +const app = express() + +const port = 3001 + +const products = [ + {name:"Television", price: 24.56, currency: "USG", brand: "samsung"}, + {name:"Washing Machine", price: 67.56, currency: "EUR", brand: "LG"} +] + +app.get('/products', (request, response)=>{ + + response.json(products) +}) + +app.listen(port, + () => console.log(`Server listening on port ${port}.`)) + \ No newline at end of file diff --git a/nodejs/errorhandling/package.json b/nodejs/errorhandling/package.json new file mode 100644 index 0000000..9f75654 --- /dev/null +++ b/nodejs/errorhandling/package.json @@ -0,0 +1,23 @@ +{ + "name": "storefront", + "version": "1.0.0", + "description": "", + "main": "js/index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [], + "author": "", + "license": "ISC", + "dependencies": { + "axios": "^0.26.1", + "express": "^4.17.3", + "node-fetch": "^3.2.2" + }, + "devDependencies": { + "@types/express": "^4.17.13", + "@types/node": "^17.0.23", + "ts-node": "^10.7.0", + "typescript": "^4.6.3" + } +} diff --git a/nodejs/errorhandling/ts/app.ts b/nodejs/errorhandling/ts/app.ts new file mode 100644 index 0000000..a5fde57 --- /dev/null +++ b/nodejs/errorhandling/ts/app.ts @@ -0,0 +1,126 @@ +import express, { Request, Response, NextFunction } from 'express' +import axios from 'axios' + +const app = express() +const port:number = 3000 + + +interface Product { + + name: string + price: number + brand: string + category?: string +} + +interface ProductCreationResponse { + productID: string + result: string +} + + +class AppError extends Error{ + statusCode: number; + + constructor(statusCode: number, message: string) { + super(message); + + Object.setPrototypeOf(this, new.target.prototype); + this.name = Error.name; + this.statusCode = statusCode; + Error.captureStackTrace(this); + } +} + +const requestLogger = (request: Request, response: Response, next: NextFunction) => { + console.log(`${request.method} url:: ${request.url}`); + next() +} + +app.use(express.static('images')) +app.use(express.static('htmls')) +app.use(requestLogger) + +app.use('/products', express.json({ limit: 100 })) + +// Error handling Middleware functions +const errorLogger = (error: Error, request: Request, response: Response, next: NextFunction) => { + console.log( `error ${error.message}`) + next(error) // calling next middleware +} + +const errorResponder = (error: AppError, request: Request, response: Response, next: NextFunction) => { + response.header("Content-Type", 'application/json') + + const status = error.statusCode || 400 + response.status(status).send(error.message) +} + +const invalidPathHandler = (request: Request, response: Response, next: NextFunction) => { + response.status(400) + response.send('invalid path') +} + + +// handle post request for path /products +app.post('/products', (request: Request, response: Response) => { + const products = [] + + const name = request.body.name + + const brand = request.body.brand + + const category = request.body.category + + if(name == null){ + response.status(500).json({ message: "Mandatory field name is missing. " }) + }else{ + console.log(name + " " + brand) + + products.push({name: request.body.name, brand: request.body.brand, price: request.body.price}) + + const productCreationResponse = {productID: "12345", result: "success"} + response.json(productCreationResponse) + } +}) + +app.get('/products', async (request: Request, response: Response, next: NextFunction)=>{ + try{ + const apiResponse = await axios.get("http://localhost:3001/products") + + const jsonResponse = apiResponse.data + console.log("response "+jsonResponse) + + response.send(jsonResponse) + }catch(error){ + next(error) + } + +}) + +app.get('/product', (request: Request, response: Response, next: NextFunction)=>{ + + axios.get("http://localhost:3001/product") + .then(jsonresponse=>response.send(jsonresponse)) + .catch(next) +}) + +app.get('/productswitherror', (request, response) => { + let error:AppError = new AppError(400, `processing error in request at ${request.url}`) + error.statusCode = 400 + throw error +}) + +app.get('/productswitherror', (request: Request, response: Response) => { + let error: AppError = new AppError(400, `processing error in request at ${request.url}`) + + throw error + }) + + app.use(errorLogger) + app.use(errorResponder) + app.use(invalidPathHandler) + +app.listen(port, () => { + console.log(`Server listening at port ${port}.`) +}) \ No newline at end of file diff --git a/nodejs/errorhandling/ts/server.ts b/nodejs/errorhandling/ts/server.ts new file mode 100644 index 0000000..32c5d4c --- /dev/null +++ b/nodejs/errorhandling/ts/server.ts @@ -0,0 +1,28 @@ +import express, { Request, Response, NextFunction } from 'express' + +const app = express() + +const port:number = 3001 + +interface Product { + + name: string + price: number + currency: string + brand: string + category?: string +} + +const products: Product[] = [ + {name:"Television", price: 24.56, currency: "USG", brand: "samsung"}, + {name:"Washing Machine", price: 67.56, currency: "EUR", brand: "LG"} +] + +app.get('/products', (request: Request, response: Response)=>{ + + response.json(products) +}) + +app.listen(port, + () => console.log(`Server listening on port ${port}.`)) + \ No newline at end of file diff --git a/nodejs/errorhandling/tsconfig.json b/nodejs/errorhandling/tsconfig.json new file mode 100644 index 0000000..162ef62 --- /dev/null +++ b/nodejs/errorhandling/tsconfig.json @@ -0,0 +1,8 @@ +{ + "compilerOptions": { + "module": "commonjs", + "target": "es6", + "rootDir": "./ts", + "esModuleInterop": true + } +}