7/15/2018

Short Notes: Design Principles & Patterns

History

The book called “A Pattern Language”[1]  by C.Alexander is the fundamental reference for the book named “Design Patterns (GoF)”, which is generally accepted as the standard in building software.

[1] https://en.wikipedia.org/wiki/A_Pattern_Language

A pattern language [2] is a method of describing good design practices or patterns of useful organization within a field of expertise.

[2] https://en.wikipedia.org/wiki/Pattern_language

Architectural Concepts

Envision - Examine stakeholder goals combined with architect’s expertise.
Model - View the ‘lay of the land’ to construct a model  and set expectations.
Blueprint - Specify the construction details  and constraints.
Inspect - Verify construction implementation.
Term Nomenclature - communicate  in terms understandable to listeners.

Mapping to software field

Envision - Software Architecture
Model - Design Patterns
Blueprint - Construction & Design principles 
Inspect - Construction patterns
Nomenclature - Terms to communicate intent


Software Architecture

Web server - A pipeline architecture
SOA - A component architecture
Client/Server - A layered architecture
Single Tier - Monolithic Architecture
Cloud - A tier network architecture

Design Patterns

Creational
Structural & Behavioral
Event Driven
Plugins

Design Principles

Abstraction
Encapsulation
Cohesion
Coupling
Complexity

Construction Patterns

Inheritance Design
Component Design
Layered Design
Tier Design
Delivery methodology
Software Architecture erosion (“Decay”)

OO Basics

Abstraction
Encapsulation
Polymorphism
Inheritance

OO Principles

  • Encapsulate what varies - Identify different behaviors
  • Favor composition over inheritance
  • Program to interfaces, not implementation
  • Strive for loosely coupled designs between objects that interact.
  • Open Close : Class should be open for extension but close for modification.
  • Dependency Inversion : Depend upon abstraction. Do not depend upon concrete classes.
  • Least Knowledge : Talk only to your immediate friends.

OO Patterns

What is a pattern : A reusable solution to a problem which occurring in a particular context.

Structure of a pattern

Name
Context - situation
Problem
Forces
Solution
Resulting Contex -  1. Benefits 2. Drawbacks 3. Issues to resolve
Related Patterns

Three fundamental groups

https://www.gofpatterns.com/design-patterns/module2/three-types-design-patterns.php


Behavioral patterns - Describe interactions between objects

  • Interpreter pattern -

Creational- Make objects of the right class for a problem

  • Factory vs Abstract Factory
  • Singleton-


Structural- Form larger structures from individual parts, generally of different classes.









7/14/2018

Short Notes : Kafka


Apache Avro


Logs



Another way to put this is in database terms: a pure stream is one where the changes are interpreted as just
INSERT statements (since no record replaces any existing record),
whereas a table is a stream where the changes are interpreted as UPDATEs (since any existing row with the same key is overwritten).




Paper





Article



Architecture


Big data messaging


Install zookeeper and kafka (horton works)




Simple consumer and producer app



Duality of stream & table + windowing



Kafka Connect sample




Partitions

Topics in Kafka can be subdivided into partitions. For example, while creating a topic named Demo, you might configure it to have three partitions.The server would create three log files, one for each of the demo partitions.

When a producer published a message to the topic, it would assign a partition ID for that message.

The server would then append the message to the log file for that partition only.


If you then started two consumers, the server might assign partitions 1 and 2 to the first consumer, and partition 3 to the second consumer. Each consumer would read only from its assigned partitions.


             1. Active-Passive (Partition Active/Passive)


kafka-partioning-active-passive.png





To expand the scenario, imagine a Kafka cluster with two brokers, housed in two machines. When you partitioned the demo topic, you would configure it to have two partitions and two replicas.

For this type of configuration, the Kafka server would assign the two partitions to the two brokers in your cluster.

Each broker would be the leader for one of the partitions.

When a producer published a message, it would go to the partition leader.

The leader would take the message and append it to the log file on the local machine.

The second broker would passively replicate that commit log to its own machine. If the partition leader went down, the second broker would become the new leader and start serving client requests.

In the same way, when a consumer sent a request to a partition, that request would go first to the partition leader, which would return the requested messages.


2. Active-Active

kafka-partioning-active-active.png


    1. Scalability: In a system with just one partition, messages published to a topic are stored in a log file, Partitioning a topic lets you scale your system by storing messages on different machines in a cluster. If you wanted to store 30 gigabytes (GB) of messages for the Demo topic, for instance, you could build a Kafka cluster of three machines,each with 10 GB of disk space. Then you would configure the topic to have three partitions. which exists on a single machine.

        The number of messages for a topic must fit into a single commit log file, and the size of messages stored can never be more than that machine's disk space.

    2. Server-load balancing: Having multiple partitions lets you spread message requests across brokers. For example, If you had a topic that processed 1 million messages per second, you could divide it into 100 partitions and add 100 brokers to your cluster.

        Each broker would be the leader for single partition, responsible for responding to just 10,000 client requests per second.

    3. Consumer-load balancing: Similar to server-load balancing, hosting multiple consumers on different machine lets you spread the consumer load.

        Let's say you wanted to consume 1 million messages per second from a topic with 100 partitions. You could create 100 consumers and run them in parallel.

        The Kafka server would assign one partition to each of the consumers, and each consumer would process 10,000 messages in parallel.

        Since Kafka assigns each partition to only one consumer, within the partition each message would be consumed in order.




Two partition two consumers under one user group

kafka-partioning.png

Two partition one producer three consumers under two user groups

multiple-consumer-group.png



Multi Threaded Consumer

Kafka Connect Architecture 

  • Connector model 

  • Worker model 

  • Data model


Connector Model

A connector is defined by specifying a Connector class and configuration options to control what data is copied and how to format it.
Each Connector instance is responsible for defining and updating a set of Tasks that actually copy the data. Kafka Connect manages the Tasks;
the Connector is only responsible for generating the set of Tasks and indicating to the framework when they need to be updated.

Connectors do not perform any data copying themselves: their configuration describes the data to be copied,
and the Connector is responsible for breaking that job into a set of Tasks that can be distributed to workers.


Standalone mode
../_images/connector-model.png


Distributed mode

../_images/worker-model-basics.png

First, Kafka Connect performs broad copying by default by having users define jobs at the level of Connectors which then break the job into smaller Tasks.

It also provides one point of parallelism by requiring Connectors to immediately consider how their job can be broken down into subtasks, Finally, by specializing source and sink interfaces, Kafka Connect provides an accessible connector API that makes it very easy to implement connectors for a variety of systems.

Worker Model

A Kafka Connect cluster consists of a set of Worker processes that are containers that execute Connectors and Tasks. Workers automatically coordinate with

each other to distribute work and provide scalability and fault tolerance. The Workers will distribute work among any available processes, but are not responsible for management of the processes; any process management strategy can be used for Workers (e.g. cluster management tools like YARN or Mesos, configuration management tools like Chef or Puppet, or direct management of process lifecycles).

The worker model allows Kafka Connect to scale to the application. It can run scaled down to a single worker process that also acts as its own coordinator, or in clustered mode where connectors and tasks are dynamically scheduled on workers.

However, it assumes very little about the process management of the workers, so it can easily run on a variety of cluster managers or using traditional service supervision.


Data Model

Connectors copy streams of messages from a partitioned input stream to a partitioned output stream, where at least one of the input or output is always Kafka.

Each of these streams is an ordered set messages where each message has an associated offset
The message contents are represented by Connectors in a serialization-agnostic format, and Kafka Connect supports pluggable Converters for storing this data in a variety of serialization formats. Schemas are built-in, allowing important metadata about the format of messages to be propagated through complex data pipelines. 
However, schema-free data can also be used when a schema is simply unavailable.

This allows Kafka Connect to focus only on copying data because a variety of stream processing tools are available to further process the data, which keeps Kafka Connect simple, both conceptually and in its implementation.



Kafka on Yarn (KOYA)



high-lvl-kafka-f1
KOYA - Apache Kafka on YARN

Why not confluent platform? It’s open source as well



Geo Redundancy 
  1. Netflix Use case


     2. Recommended approach (In same region)




 
  3. Best approach for Multi Region to avoid cycles




Mirror Message handling in Kafka to avoid cycling


Master-Master replication





   4. Kafka + ElasticSearch


https://www.elastic.co/blog/scaling_elasticsearch_across_data_centers_with_kafkahttps://www.elastic.co/blog/scaling_elasticsearch_across_data_centers_with_kafka

* Note: we can replace Logstash with Kafka Connect Sink Connector.


Mirror Maker

Important of mirror maker and improvements done with it

Some references





Message Handler sample



Kafka Messaging Semantics

Exactly one


Message compaction


Partitioning




Kafka tuning recommendation


Kafka Cluster




Kafka Cluster Deployment Recommendation


Dedicated Zookeeper
  • Have a separate zookeeper cluster dedicated to Storm/Kafka operations. This will improve Storm/Kafka’s performance for writing offsets to Zookeeper,
    it will not be competing with HBase or other components for read/write access.
ZK on separate nodes from Kafka Broker
  • Do Not Install zk nodes on the same node as kafka broker if you want optimal Kafka performance. Disk I/O both kafka and zk are disk I/O intensive.



Zookeeper recommendation:


Design Doc



Kafka Data Replication




Hard Problems



7/13/2018

JVM Heap Memory Allocation : G1 vs CMS


People misunderstand about how G1 GC collector memory allocate at JVM boot up. We normally configure max and min values for heap but for G1 GC collector, no need to configure min value. Let's try out two scenarios and have a look at the observations.

G1 GC Collector

ps -ef | grep java
uranadh   4645 4639 16 19:46 ?        00:01:02 /usr/bin/java -Dosgi.requiredJavaVersion=1.8 -XX:+UseG1GC -XX:+UseStringDeduplication -Dosgi.requiredJavaVersion=1.8 -Xms5000m -Xmx5000m -jar /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher_1.3.201.v20161025-1711.jar -os linux -ws gtk -arch x86_64 -showsplash /home/uranadh/software/eclipse//plugins/org.eclipse.platform_4.6.3.v20170301-0400/splash.bmp -launcher /home/uranadh/software/eclipse/eclipse -name Eclipse --launcher.library /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher.gtk.linux.x86_64_1.1.401.v20161122-1740/eclipse_1618.so -startup /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher_1.3.201.v20161025-1711.jar --launcher.appendVmargs -exitdata 240005 -product org.eclipse.epp.package.jee.product -vm /usr/bin/java -vmargs -Dosgi.requiredJavaVersion=1.8 -XX:+UseG1GC -XX:+UseStringDeduplication -Dosgi.requiredJavaVersion=1.8 -Xms5000m -Xmx5000m -jar /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher_1.3.201.v20161025-1711.jar
uranadh   4792 4787 33 19:48 ?        00:01:20 /usr/bin/java -Dosgi.requiredJavaVersion=1.8 -XX:+UseG1GC -XX:+UseStringDeduplication -Dosgi.requiredJavaVersion=1.8 -Xms10000m -Xmx10000m -jar /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher_1.3.201.v20161025-1711.jar -os linux -ws gtk -arch x86_64 -showsplash /home/uranadh/software/eclipse//plugins/org.eclipse.platform_4.6.3.v20170301-0400/splash.bmp -launcher /home/uranadh/software/eclipse/eclipse -name Eclipse --launcher.library /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher.gtk.linux.x86_64_1.1.401.v20161122-1740/eclipse_1618.so -startup /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher_1.3.201.v20161025-1711.jar --launcher.appendVmargs -exitdata 28800f -product org.eclipse.epp.package.jee.product -vm /usr/bin/java -vmargs -Dosgi.requiredJavaVersion=1.8 -XX:+UseG1GC -XX:+UseStringDeduplication -Dosgi.requiredJavaVersion=1.8 -Xms10000m -Xmx10000m -jar /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher_1.3.201.v20161025-1711.jar
uranadh   4938 4932 99 19:51 ?        00:01:21 /usr/bin/java -Dosgi.requiredJavaVersion=1.8 -XX:+UseG1GC -XX:+UseStringDeduplication -Dosgi.requiredJavaVersion=1.8 -Xms10000m -Xmx10000m -jar /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher_1.3.201.v20161025-1711.jar -os linux -ws gtk -arch x86_64 -showsplash /home/uranadh/software/eclipse//plugins/org.eclipse.platform_4.6.3.v20170301-0400/splash.bmp -launcher /home/uranadh/software/eclipse/eclipse -name Eclipse --launcher.library /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher.gtk.linux.x86_64_1.1.401.v20161122-1740/eclipse_1618.so -startup /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher_1.3.201.v20161025-1711.jar --launcher.appendVmargs -exitdata 2b0012 -product org.eclipse.epp.package.jee.product -vm /usr/bin/java -vmargs -Dosgi.requiredJavaVersion=1.8 -XX:+UseG1GC -XX:+UseStringDeduplication -Dosgi.requiredJavaVersion=1.8 -Xms10000m -Xmx10000m -jar /home/uranadh/software/eclipse//plugins/org.eclipse.equinox.launcher_1.3.201.v20161025-1711.jar
uranadh   5087 2966  0 19:52 pts/19   00:00:00 grep --color=auto java
uranadh@uranadh:~$ free -m
                 total            used        free      shared  buff/cache   available
Mem:          15953        4674        9148     257              2130     10683
Swap:          4767           0            4767


Conclusion :

Even though we have bootup three JVM instances with altogether 25GB memory allocation it had been only used < 4674 M .

The reason is when we use G1 GC option with JVM, it doesn’t matter initial heap size (Xms) value and only matter is maximum heap size ( Xmx) . Which means when JVM bootup it doesn’t allocate Xms size of memory.



CMS GC Collector

uranadh@uranadh:~$ ps -ef | grep java
uranadh   9501 6976  1 10:07 pts/4    00:00:26 /home/uranadh/software/jdk1.8.0_131/bin/java -Xloggc:/home/uranadh/software/apache-cassandra-3.11.2/logs/gc.log -ea -XX:+UseThreadPriorities -XX:ThreadPriorityPolicy=42 -XX:+HeapDumpOnOutOfMemoryError -Xss256k -XX:StringTableSize=1000003 -XX:+AlwaysPreTouch -XX:-UseBiasedLocking -XX:+UseTLAB -XX:+ResizeTLAB -XX:+UseNUMA -XX:+PerfDisableSharedMem -Djava.net.preferIPv4Stack=true -Xms6G -Xmx6G -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=1 -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSWaitDuration=10000 -XX:+CMSParallelInitialMarkEnabled -XX:+CMSEdenChunksRecordAlways -XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintHeapAtGC -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -XX:+PrintPromotionFailure -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10M -Xmn400M -XX:+UseCondCardMark -XX:CompileCommandFile=/home/uranadh/software/apache-cassandra-3.11.2/conf/hotspot_compiler -javaagent:/home/uranadh/software/apache-cassandra-3.11.2/lib/jamm-0.3.0.jar -Dcassandra.jmx.local.port=7199 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.password.file=/etc/cassandra/jmxremote.password -Djava.library.path=/home/uranadh/software/apache-cassandra-3.11.2/lib/sigar-bin -XX:OnOutOfMemoryError=kill -9 %p -Dlogback.configurationFile=logback.xml -Dcassandra.logdir=/home/uranadh/software/apache-cassandra-3.11.2/logs -Dcassandra.storagedir=/home/uranadh/software/apache-cassandra-3.11.2/data -Dcassandra-foreground=yes -cp /home/uranadh/software/apache-cassandra-3.11.2/conf:/home/uranadh/software/apache-cassandra-3.11.2/build/classes/main:/home/uranadh/software/apache-cassandra-3.11.2/build/classes/thrift:/home/uranadh/software/apache-cassandra-3.11.2/lib/HdrHistogram-2.1.9.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/ST4-4.0.8.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/airline-0.6.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/antlr-runtime-3.5.2.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/apache-cassandra-3.11.2.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/apache-cassandra-thrift-3.11.2.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/asm-5.0.4.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/caffeine-2.2.6.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/cassandra-driver-core-3.0.1-shaded.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/commons-cli-1.1.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/commons-codec-1.9.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/commons-lang3-3.1.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/commons-math3-3.2.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/compress-lzf-0.8.4.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/concurrent-trees-2.4.0.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/concurrentlinkedhashmap-lru-1.4.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/disruptor-3.0.1.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/ecj-4.4.2.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/guava-18.0.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/high-scale-lib-1.0.6.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/hppc-0.5.4.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/jackson-core-asl-1.9.13.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/jackson-mapper-asl-1.9.13.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/jamm-0.3.0.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/javax.inject.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/jbcrypt-0.3m.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/jcl-over-slf4j-1.7.7.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/jctools-core-1.2.1.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/jflex-1.6.0.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/jna-4.2.2.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/joda-time-2.4.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/json-simple-1.1.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/jstackjunit-0.0.1.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/libthrift-0.9.2.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/log4j-over-slf4j-1.7.7.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/logback-classic-1.1.3.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/logback-core-1.1.3.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/lz4-1.3.0.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/metrics-core-3.1.0.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/metrics-jvm-3.1.0.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/metrics-logback-3.1.0.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/netty-all-4.0.44.Final.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/ohc-core-0.4.4.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/ohc-core-j8-0.4.4.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/reporter-config-base-3.0.3.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/reporter-config3-3.0.3.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/sigar-1.6.4.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/slf4j-api-1.7.7.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/snakeyaml-1.11.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/snappy-java-1.1.1.7.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/snowball-stemmer-1.3.0.581.1.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/stream-2.5.2.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/thrift-server-0.3.7.jar:/home/uranadh/software/apache-cassandra-3.11.2/lib/jsr223/*/*.jar org.apache.cassandra.service.CassandraDaemon
uranadh  11160 1579  0 10:31 ?     00:00:00 eog /home/uranadh/Pictures/java-heap-CMS.png
uranadh  11315 2966  0 10:36 pts/19   00:00:00 grep --color=auto java
uranadh@uranadh:~$ free -m
                 total            used        free          shared  buff/cache   available
Mem:          15953        9882        2446         335        3624        5362
Swap:          4767           0            4767

After killing Cassandra instance

uranadh@uranadh:~$ free -m
                 total            used        free          shared  buff/cache   available
Mem:          15953        3295        9032         335        3624           11948
Swap:          4767           0            4767
uranadh@uranadh:~$


Conclusion :

We have bootup JVM instance with 6GB memory allocation and when we check OS memory usage JVM instance used 6GB of memory.. But when we monitor JVM memory utilization it only consumed few MBs only

The reason is when we use CMS GC option with JVM, it allocated initial heap size (Xms)  when JVM bootup. OS see this as already utilized memory (6GB) but really it only utilized few MBs .