3/26/2022

AWS SNS and Lambda Lesson Learn



 

The purpose of this blog post is to share my recent experience with AWS budget alerting, using above AWS stack.

We can create an alert to trigger alert message when certain threshold increase at AWS budget.

Then what happen is, that alert message will publish to SNS topic and same message will be consumed by Lambda function.

We can do transform the message and sent to Microsoft Team channel through a Webhook. Please refer this document for more details about this approach.

https://aws.amazon.com/premiumsupport/knowledge-center/sns-lambda-webhooks-chime-slack-teams/


Steps


1. Create AWS Budget Alert

a. Create AWS Budget

https://docs.aws.amazon.com/cost-management/latest/userguide/budgets-create.html

b. Create AWS Alert

https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/gs_monitor_estimated_charges_with_cloudwatch.html#gs_creating_billing_alarm






 





2. Create Lambda function

https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html

Now in our case we are using Python Lambda function. so please refer below section.

https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html

3. Subscribe Lambda function to SNS topic

https://docs.aws.amazon.com/sns/latest/dg/sns-lambda-as-subscriber.html


4. Analyzing the sample code from below document.

https://aws.amazon.com/premiumsupport/knowledge-center/sns-lambda-webhooks-chime-slack-teams/


#!/usr/bin/python3.6

import urllib3

import json

http = urllib3.PoolManager()

def lambda_handler(event, context):

    url = "https://outlook.office.com/webhook/xxxxxxx"

    msg = {

    "text": event['Records'][0]['Sns']['Message']

    }

    print('Type of message', type(msg))     

    encoded_msg = json.dumps(msg).encode('utf-8')

    resp = http.request('POST',url, body=encoded_msg)

    print("=====After encode======",encoded_msg)

    print({

    "status_code": resp.status,

    "response": resp.data

    })


5.  Unit Testing

Now we are getting a JSON message from SNS topic, but when we encode it, to send to MS team via webhook, we are getting HTTP 400 error code.

 b'{"text": {"accountId": "xxxxxxx", "timeStamp": "2022-03-17T17:40:03.434Z", 


6. Integration Testing


When we  send same JSON to SNS topic, Lambda function will send it to MS team successfully.

a. Publish message by clicking Publish message button.






Copy paste the JSON message in Message body section and click Publish Message button.


7. Debug with Cloud Watch Logs.





a. Click View logs in CloudWatch .





b. Click latest Log stream .







8. Comparison of two logs 







As you can see the different of above two logs, the success encoded JSON message, child node wrapped with a double quotes.


9. Improve Lambda Function to Transform Output Message.


Now we have improved the Lambda function to transform the output message to another JSON.


 #!/usr/bin/python3.6

import urllib3 

import json

import ast

http = urllib3.PoolManager() 

def lambda_handler(event, context): 

    url = "https://outlook.office.com/webhook/xxxxxxx"    

    alert_dict = event['Records'][0]['Sns']['Message']

    if isinstance(alert_dict, str):

      print('Type of variable is a string')

      alert_dict = ast.literal_eval(alert_dict)

    else:

      print('Type is variable is not a string')

    

    #Construct new json format

    title = "Budget Exceed Alert "

    limit = (alert_dict['budget']['budgetLimit']['amount'])

    threshold = (alert_dict['action']['actionThreshold']['value'])

    print("===========", type(limit))

    print("===========", threshold)

    actual_amount = (limit * threshold) /100


    alert_json = dict(AlertTitle=title,TimeStamp=alert_dict['timeStamp'],BudgetName=alert_dict['budget']['budgetName'],BudgetType=alert_dict['budget']['budgetType'],BudgetAmount=alert_dict['budget']['budgetLimit']['amount'],AlertThreshold=alert_dict['action']['actionThreshold']['value'],ActualAmount=actual_amount,AlertType=alert_dict['action']['notificationType'],Message=alert_dict['message'])

    summery = {

        "text": json.dumps(alert_json) 

    }

    

    print("===========", type(summery))

    print("=====Before JSON======", summery)

    json_object = json.dumps(summery)  

    encoded_msg = json_object.encode('utf-8')

    print("=====After encode======",encoded_msg)    

    resp = http.request('POST',url, body=encoded_msg)

    print({

        "status_code": resp.status, 

        "response": resp.data

    })


10. Unit Testing










11. Integration Testing









12. Conclusion

  • Now both, unite test and integration test will generate same output JSON.
  • The way create JSON is important in Python and it's bit tricky.

summery = {

        "text": json.dumps(alert_json) 

    }


  • Did you notice one difference in both testing?
Unit test : 32 : Integer value
Integration: 32.0 : Float value

Same python script can be behaved differently when it comes to AWS Lambda.





3/06/2022

Learning Camunda/Zeebe by Example - SAGA Pattern

 






























Purpose of this blogpost is to explain how to use Zeebe/Camunda to implement SAGA pattern with Microservices.

This is the next episode of previous blogpost

To setup Zeebe cluster in Kubernetes, you have to follow same steps in previous blogpost.

Also I have made a slight different to docker-compose.yml files, to run it in host network.
Please refer this git commit for for details : 


Now let's build the project with maven command : mvn clean install

https://github.com/dhanuka84/SAGA-Microservices-Zeebe/tree/main/src/zeebe-saga-spring-boot





Then Build the docker images with maven spring-boot plugin:

mvn spring-boot:build-image


  • To deploy Zeebe cluster, you have to go through same steps given in previous post.

Deploy MongoDB Cluster

MONGO_REPLICASET_HOST=mongo docker-compose -f src/zeebe-saga-spring-boot/docker/docker-compose-mongo.yaml up



Deploy Microservices

docker-compose  -f src/zeebe-saga-spring-boot/docker/docker-compose-micros.yaml up



Test Microservices Health with Actuator ( status, liveness, readiness)



dhanuka@dhanuka:~$ curl http://localhost:8081/actuator/health | jq '. | {status: .status, liveness: .components.livenessState.status, readiness: .components.readinessState.status,}'   


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100   352  100   352    0     0    592      0 --:--:-- --:--:-- --:--:--   593

{

  "status": "UP",

  "liveness": "UP",

  "readiness": "UP"

}


dhanuka@dhanuka:~$ curl http://localhost:8083/actuator/health | jq '. | {status: .status, liveness: .components.livenessState.status, readiness: .components.readinessState.status,}'   


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100   352  100   352    0     0    824      0 --:--:-- --:--:-- --:--:--   822

{

  "status": "UP",

  "liveness": "UP",

  "readiness": "UP"

}


dhanuka@dhanuka:~$ curl http://localhost:8084/actuator/health | jq '. | {status: .status, liveness: .components.livenessState.status, readiness: .components.readinessState.status,}'   


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100   352  100   352    0     0    675      0 --:--:-- --:--:-- --:--:--   675

{

  "status": "UP",

  "liveness": "UP",

  "readiness": "UP"

}




dhanuka@dhanuka:~$ curl http://localhost:8082/actuator/health | jq '. | {status: .status, liveness: .components.livenessState.status, readiness: .components.readinessState.status,}'   


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100   352  100   352    0     0    820      0 --:--:-- --:--:-- --:--:--   822

{

  "status": "UP",

  "liveness": "UP",

  "readiness": "UP"

}



Deploy Zeebe BPMN Process


dhanuka@dhanuka:~$ zbctl deploy workflows/saga-example.bpmn --insecure

{

  "key": "2251799813703663",

  "processes": [

    {

      "bpmnProcessId": "trip-booking",

      "version": 1,

      "processDefinitionKey": "2251799813703662",

      "resourceName": "workflows/saga-example.bpmn"

    }

  ]

}



Create a booking via Rest Call to Booking Microservice

 

dhanuka@dhanuka:~$ curl --location --request POST 'http://localhost:8081/booking/' \

--header 'Content-Type: application/json' \

--data-raw '{

    "id" : "0",

    "clientId":"123",

    "resourceId":"987",

    "fromDate":"2021-02-22T14:52:44.494264+01:00",

    "toDate":"2021-03-06T14:52:44.495451+01:00",

    "createdAt":"2021-02-10T14:52:44.495469+01:00",

    "active":false

}'

 

{"id":"0","clientId":"123","houseBookingId":"642c957b-f7ed-45d9-8719-b11fa451dbfa","carBookingId":"dbb44830-2cb6-4c02-ab5d-bf0850e69bb0","flightBookingId":"b573f7ca-cab9-48ab-9bdf-cc08e5e199ae","fromDate":"2021-02-22T13:52:44.494264Z","toDate":"2021-03-06T13:52:44.495451Z","createdAt":"2021-02-10T13:52:44.495469Z","active":false}

  •  Once you login to Zeebe dashboard via http://localhost:8080/ , you can select the process id (trip-booking) and then version, then you can select the process instance
  • Then you can see the happy path of the work flow.

 

 
Java Code Explanation
 

1. Where is Zeebee Process Instance Created?

https://github.com/dhanuka84/SAGA-Microservices-Zeebe/blob/main/src/zeebe-saga-spring-boot/booking-microservice/src/main/java/com/example/booking/service/impl/BookingServiceImpl.java

 

 

  • As you can see, the instance was created in line 34.

Key Points:

  1. Line 34 : Spring WebFlux, which provides reactive programming support for web applications

https://www.baeldung.com/spring-webflux

  1. Line 37: Creating a variable called bookingResult with Booking data.

  2. Line 40: Usage of Completable Futures.

https://www.callicoder.com/java-8-completablefuture-tutorial/#:~:text=Future%20vs%20CompletableFuture,result%20of%20an%20asynchronous%20computation.

  1. Line 49: Save the Trip Booking entity when received hotel booking id, car booking id and flight booking id.

  2. So all the operations within and outside the microservice will be asynchronous and reactive.

 

2. Zeebee Task polling/requesting for jobs, then execute and response back to broker.

  • Let’s take Hotel Booking as an example.

https://github.com/dhanuka84/SAGA-Microservices-Zeebe/blob/main/src/zeebe-saga-spring-boot/hotel-microservice/src/main/java/com/example/hotel/task/BookingTask.java

 

 

 

Key Points:

  1. Line 41,42,43 : Access job data. 

  2. Line 45 : Validate the job based on headers

  3. Line 50: Access Booking Info, based on requestName (bookingResult) variable value, which was created when process instance creation.

  4. Line 64: Create Hotel Booking and reactively update Zeebe with response.

  • Note that now the resultName variable value of response is bookHotelResult.

Failure Path Testing

  • You have to change the BPMN configuration to enable simulateError as below.


 

Deploy workflow process next version 

dhanuka@dhanuka:~$ zbctl deploy workflows/saga-example.bpmn --insecure

{

  "key": "2251799813703663",

  "processes": [

    {

      "bpmnProcessId": "trip-booking",

      "version": 2,

      "processDefinitionKey": "2251799813703662",

      "resourceName": "workflows/saga-example.bpmn"

    }

  ]

}

 

Create a booking via Rest Call to Booking Microservice

 

dhanuka@dhanuka:~$ curl --location --request POST 'http://localhost:8081/booking/' \

--header 'Content-Type: application/json' \

--data-raw '{

    "id" : "0",

    "clientId":"123",

    "resourceId":"987",

    "fromDate":"2021-02-22T14:52:44.494264+01:00",

    "toDate":"2021-03-06T14:52:44.495451+01:00",

    "createdAt":"2021-02-10T14:52:44.495469+01:00",

    "active":false

}'

 

  • Now you can see the failure path of the workflow 
  • We failed the work flow from Flight Booking task.
  • The failure will be propagated to other tasks which is polling on same instance_id .




Failure Path Java Explanation 


Key Points


1. We have a separate Zeebe task (ex: flight-booking-rollback) to handle failure scenario. This task simply delete relevant booking.

2. Line 45: If the SimulateError true, then this will trigger Failed response to Zeebe.





3. Now according tho BPMN workflow , if BookingFlight Result is not success, then it will call ServiceTask_CancelFlight as target reference.


 

As you can see ServiceTask_CancelFlight task definition is flight-booking-rollback .










Because of this, there will be a job to execute for flight-booking-rollback  task in Java Class.

4. Now how come this propagate to other Microservices?




In the BPMN configuration , you can see the chain of call from ServiceTask_CancelFlight to ServiceTask_CancelCar and ServiceTask_CancelHotel.

This will helped to create rollback jobs for other Microservices rollback tasks.



Some inspirational videos :)