Tagged: Spring

Why We Use Spring Boot Maven Plugin
30
Mar
2021

Why We Use Spring Boot Maven Plugin?

It provides Spring Boot support in Maven, letting us package executable jar or war archives and run an application “in-place”. To use it, we must use Maven 3.2 (or later).

The plugin provides several goals to work with a Spring Boot application:

  • spring-boot:repackage: create a jar or war file that is auto-executable. It can replace the regular artifact or can be attached to the build lifecycle with a separate classifier.
  • spring-boot:run: run your Spring Boot application with several options to pass parameters to it.
  • spring-boot:start and stop: integrate your Spring Boot application to the integration-test phase so that the application starts before it.
  • spring-boot:build-info: generate a build information that can be used by the Actuator.
Spring-boot-starter-parent Example
30
Mar
2021

Spring Boot Starter Parent Example

In this spring boot tutorial, we will learn about spring-boot-starter-parent dependency which is used internally by all spring boot dependencies. We will also learn what all configurations this dependency provides, and how to override them.

What is spring-boot-starter-parent dependency?

The spring-boot-starter-parent dependency is the parent POM providing dependency and plugin management for Spring Boot-based applications. It contains the default versions of Java to use, the default versions of dependencies that Spring Boot uses, and the default configuration of the Maven plugins.

Few important configurations provided by this file are as below. Please refer to this link to read the complete configuration.

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;<modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${revision}</version><relativePath>../../spring-boot-dependencies</relativePath></parent><artifactId>spring-boot-starter-parent</artifactId><packaging>pom</packaging><name>Spring Boot Starter Parent</name><description>Parent pom providing dependency and plugin management for applicationsbuilt with Maven</description><properties><java.version>1.8</java.version><resource.delimiter>@</resource.delimiter> <!-- delimiter that doesn't clash with Spring ${} placeholders --><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties> ... <resource><directory>${basedir}/src/main/resources</directory><filtering>true</filtering><includes><include>**/application*.yml</include><include>**/application*.yaml</include><include>**/application*.properties</include></includes></resource> </project>

The spring-boot-starter-parent dependency further inherits from spring-boot-dependencies, which is defined at the top of above POM file at line number : 9.

This file is the actual file which contains the information of default version to use for all libraries. The following code shows the different versions of various dependencies that are configured in spring-boot-dependencies:

<properties><!-- Dependency versions --><activemq.version>5.15.3</activemq.version><antlr2.version>2.7.7</antlr2.version><appengine-sdk.version>1.9.63</appengine-sdk.version><artemis.version>2.4.0</artemis.version><aspectj.version>1.8.13</aspectj.version><assertj.version>3.9.1</assertj.version><atomikos.version>4.0.6</atomikos.version><bitronix.version>2.1.4</bitronix.version><byte-buddy.version>1.7.11</byte-buddy.version><caffeine.version>2.6.2</caffeine.version><cassandra-driver.version>3.4.0</cassandra-driver.version><classmate.version>1.3.4</classmate.version> ......</properties>

Above list is very long and you can read complete list in this link.

How to override default dependency version?

As you see, spring boot has default version to use for most of dependencies. You can override the version of your choice or project need, in properties tag in your project’s pom.xml file.

e.g. Spring boot used default version of google GSON library as 2.8.2.

<groovy.version>2.4.14</groovy.version><gson.version>2.8.2</gson.version><h2.version>1.4.197</h2.version>

I want to use 2.7 of gson dependency. So I will give this information in properties tag like this.

<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><gson.version>2.7</gson.version></properties>

Now in your eclipse editor, you can see the message as : The managed version is 2.7 The artifact is managed in org.springframework.boot:spring-boot-dependencies:2.0.0.RELEASE.

GSON resolved dependency
GSON resolved dependency

Drop me your questions in comments section.

Happy Learning !!

Persistence in Event Driven Architectures
28
Mar
2021

Persistence in Event Driven Architectures

The importance of being persistent in event driven architectures.

Enterprises have to constantly adapt and evolve their enterprise architecture strategies in order to deliver the desired business outcomes. The evolving architecture patterns may involve business processing of sales transactions with a human in the loop or they may involve machine to machine data processing using automation. Enterprises earlier adopted a request-driven model where microservices made a call to a service and the service responded to the request being made. In this request-driven model, you run into challenges around flexibility as you try to scale your global deployment footprint.

A new approach, that is quickly gaining adoption in enterprises is called the event-driven architecture. In the new approach, you are able to increase application agility and flexibility by allowing for multiple data producers to coexist with multiple data consumers and you process data only after an event or state change. In this enterprise architecture, the producers and consumers of data can be quickly extended to deliver better flexibility and agility as you scale your operation globally. Examples of event-driven solutions are available in a hosted managed format from most cloud providers today. In this blog post, we will look at a Kafka solution running in a Kubernetes cluster and how you can make sure persistence is achieved for the solution. This approach is running at customers in production today supported by Confluent and Portworx.

Why Use Event-Driven Architecture?

With the advent of 5G technology, a vast amount of data will be generated by sensors, devices, systems, and humans in the loop to track, manage and achieve business outcomes. The use case for Event-Driven architectures may include the following:

  • Business Process state changes – You want to notify the change of state between a purchase order and accounts receivable with an event. The event-based approach allows a human or a decision engine to take the next appropriate step in the process.
  • Log and Metrics processing – The event-driven model allows for multiple actions to be triggered based on a single metric. The ability to send messages to multiple event handlers in different subsystems offers the scalability and resiliency required by certain business applications.

Why Use Kafka?

Apache Kafka is a scalable, fault-tolerant messaging system that enables you to build distributed real-time applications with an event-driven architecture. Kafka delivers events, with fast ingestion rates, and provides persistence and in-order guarantees. Kafka adoption in your solution will depend on your specific use-case. Below are some important concepts about Apache Kafka:

  • Kafka organizes messages into “topics”.
  • The process that does the work in Kafka is called the “broker”. A producer pushes data into a topic hosted on a broker and a consumer pulls messages from a topic via a broker.
  • Kafka topics can be divided into “partitions”. This allows for parallelizing a topic across multiple brokers and increasing message ingests and throughput.
  • Brokers can hold multiple partitions but at any given time, only one partition can act as leader of a topic. A leader is responsible for updating any replicas with new data.
  • Brokers are responsible for storing messages to disk. The messages are stored with unique offsets. Messages in Kafka do not have a unique ID.

Persistence With Portworx and Kafka on Kubernetes

Kafka needs Zookeeper to be deployed as a StatefulSets in Kubernetes. Kafka Brokers, which maintains the state of the topics and partitions also need to be deployed as StatefulSets which should be backed by persistent volumes.

  • Kafka offers replication of topics between different brokers. In the case of a node failure, Kafka can recover from failure using the replicated topics. This recovery mechanism does create additional network calls in order to synchronize with the replica on a different broker. The recovery time of the failed node and its broker depend on the amount of data that needs to be rehydrated and network latencies in the cluster.
  • Portworx offers data replication using the replication parameter in the Kuberbetes storage class. In this scenario, the storage system is responsible for maintaining copies of the topic on different nodes. In the case of a node failure, the Kafka broker is rescheduled on a node that already contains the replicated topic data. The rebuild time of the broker is reduced because it uses the storage system to rehydrate the topic data without any network latencies. Once the data is rehydrated using the storage system, the broker can quickly catch up on the topic offset from an existing broker and thus reducing the overall recovery time.

Let’s walk through a Kubernetes node failure scenario on a Kubernetes cluster where a Kafka application is running and is backed by Portworx volumes.

Figure 1: We will describe the deployment of Kafka and Portworx on a 5 node Kubernetes cluster. In the image below, you can see that the Kafka deployment has 3 Brokers, each with 2 partitions and a replication factor of 2. For the Portworx data platform, we have a volume replication factor set to 3 for each volume.
Blog-Tech: Persistence_in_Event_Driven_Architectures figure 1

Figure 2: We will describe a node failure event. In the diagram below, we have simulated a node failure and identified the Kubernetes worker node 2 has been taken out of production. Kubernetes worker node 2 contains a Kafka broker with 2 partitions and 2 Portworx persistent volumes.
Blog-Tech: Persistence_in_Event_Driven_Architectures figure 2
Figure 3: Once Kubernetes node failure is detected by the Kubernetes API server, the Kaffka broker is deployed to a node with available resources. The Portworx Platform has the ability to influence the placement of the broker on a Kubernete node. Since Kubernetes node 4 already has a copy of Kafka Broker’s persistent volumes, Portworx makes sure that the Kafka broker is deployed on Kubernetes node 4. On the Portworx platform’s recovery side, the volumes are also created on other nodes in order to maintain the defined replication factor of 3.

Blog-Tech: Persistence_in_Event_Driven_Architectures figure 3 Figure 4: Once the failed recovery node is back in production use, the Kaffka application does not get scheduled on the recovered node since the application is already at the desired number of Kafka brokers. On the Portworx platform side, replicated volumes are placed on the recovered Kubernetes node to maintain the desired replication factor for the volumes.

Blog-Tech: Persistence_in_Event_Driven_Architectures figure 4

A Comparison Between Spring and Spring Boot
26
Mar
2021

A Comparison Between Spring and Spring Boot

What Is Spring?

Simply put, the Spring framework provides comprehensive infrastructure support for developing Java applications.

It’s packed with some nice features like Dependency Injection, and out of the box modules like:

  • Spring JDBC
  • Spring MVC
  • Spring Security
  • Spring AOP
  • Spring ORM
  • Spring Test

These modules can drastically reduce the development time of an application.

For example, in the early days of Java web development, we needed to write a lot of boilerplate code to insert a record into a data source. By using the JDBCTemplate of the Spring JDBC module, we can reduce it to a few lines of code with only a few configurations.

3. What Is Spring Boot?

Spring Boot is basically an extension of the Spring framework, which eliminates the boilerplate configurations required for setting up a Spring application.

It takes an opinionated view of the Spring platform, which paves the way for a faster and more efficient development ecosystem.

Here are just a few of the features in Spring Boot:

  • Opinionated ‘starter’ dependencies to simplify the build and application configuration
  • Embedded server to avoid complexity in application deployment
  • Metrics, Health check, and externalized configuration
  • Automatic config for Spring functionality – whenever possible

Let’s get familiar with both of these frameworks step by step.

4. Maven Dependencies

First of all, let’s look at the minimum dependencies required to create a web application using Spring:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-web</artifactId>
    <version>5.3.5</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webmvc</artifactId>
    <version>5.3.5</version>
</dependency>

Unlike Spring, Spring Boot requires only one dependency to get a web application up and running:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.4.4</version>
</dependency>

All other dependencies are added automatically to the final archive during build time.

Another good example is testing libraries. We usually use the set of Spring Test, JUnit, Hamcrest, and Mockito libraries. In a Spring project, we should add all of these libraries as dependencies.

Alternatively, in Spring Boot we only need the starter dependency for testing to automatically include these libraries.

Spring Boot provides a number of starter dependencies for different Spring modules. Some of the most commonly used ones are:

  • spring-boot-starter-data-jpa
  • spring-boot-starter-security
  • spring-boot-starter-test
  • spring-boot-starter-web
  • spring-boot-starter-thymeleaf

For the full list of starters, also check out the Spring documentation.

5. MVC Configuration

Let’s explore the configuration required to create a JSP web application using both Spring and Spring Boot.

Spring requires defining the dispatcher servlet, mappings, and other supporting configurations. We can do this using either the web.xml file or an Initializer class:

public class MyWebAppInitializer implements WebApplicationInitializer {
 
    @Override
    public void onStartup(ServletContext container) {
        AnnotationConfigWebApplicationContext context
          = new AnnotationConfigWebApplicationContext();
        context.setConfigLocation("com.baeldung");
 
        container.addListener(new ContextLoaderListener(context));
 
        ServletRegistration.Dynamic dispatcher = container
          .addServlet("dispatcher", new DispatcherServlet(context));
         
        dispatcher.setLoadOnStartup(1);
        dispatcher.addMapping("/");
    }
}

We also need to add the @EnableWebMvc annotation to a @Configuration class, and define a view-resolver to resolve the views returned from the controllers:

@EnableWebMvc
@Configuration
public class ClientWebConfig implements WebMvcConfigurer { 
   @Bean
   public ViewResolver viewResolver() {
      InternalResourceViewResolver bean
        = new InternalResourceViewResolver();
      bean.setViewClass(JstlView.class);
      bean.setPrefix("/WEB-INF/view/");
      bean.setSuffix(".jsp");
      return bean;
   }
}

By comparison, Spring Boot only needs a couple of properties to make things work once we add the web starter:

spring.mvc.view.prefix=/WEB-INF/jsp/
spring.mvc.view.suffix=.jsp

All of the Spring configuration above is automatically included by adding the Boot web starter through a process called auto-configuration.

What this means is that Spring Boot will look at the dependencies, properties, and beans that exist in the application and enable configuration based on these.

Of course, if we want to add our own custom configuration, then the Spring Boot auto-configuration will back away.

5.1. Configuring Template Engine

Now let’s learn how to configure a Thymeleaf template engine in both Spring and Spring Boot.

In Spring, we need to add the thymeleaf-spring5 dependency and some configurations for the view resolver:

@Configuration
@EnableWebMvc
public class MvcWebConfig implements WebMvcConfigurer {

    @Autowired
    private ApplicationContext applicationContext;

    @Bean
    public SpringResourceTemplateResolver templateResolver() {
        SpringResourceTemplateResolver templateResolver = 
          new SpringResourceTemplateResolver();
        templateResolver.setApplicationContext(applicationContext);
        templateResolver.setPrefix("/WEB-INF/views/");
        templateResolver.setSuffix(".html");
        return templateResolver;
    }

    @Bean
    public SpringTemplateEngine templateEngine() {
        SpringTemplateEngine templateEngine = new SpringTemplateEngine();
        templateEngine.setTemplateResolver(templateResolver());
        templateEngine.setEnableSpringELCompiler(true);
        return templateEngine;
    }

    @Override
    public void configureViewResolvers(ViewResolverRegistry registry) {
        ThymeleafViewResolver resolver = new ThymeleafViewResolver();
        resolver.setTemplateEngine(templateEngine());
        registry.viewResolver(resolver);
    }
}

Spring Boot 1 only requires the dependency of spring-boot-starter-thymeleaf to enable Thymeleaf support in a web application. Due to the new features in Thymeleaf3.0, we also have to add thymeleaf-layout-dialect as a dependency in a Spring Boot 2 web application. Alternatively, we can choose to add a spring-boot-starter-thymeleaf dependency that’ll take care of all of this for us.

Once the dependencies are in place, we can add the templates to the src/main/resources/templates folder and the Spring Boot will display them automatically.

6. Spring Security Configuration

For the sake of simplicity, we’ll see how the default HTTP Basic authentication is enabled using these frameworks.

Let’s start by looking at the dependencies and configuration we need to enable Security using Spring.

Spring requires both the standard spring-security-web and spring-security-config dependencies to set up Security in an application.

Next we need to add a class that extends the WebSecurityConfigurerAdapter and makes use of the @EnableWebSecurity annotation:

@Configuration
@EnableWebSecurity
public class CustomWebSecurityConfigurerAdapter extends WebSecurityConfigurerAdapter {
 
    @Autowired
    public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {
        auth.inMemoryAuthentication()
          .withUser("user1")
            .password(passwordEncoder()
            .encode("user1Pass"))
          .authorities("ROLE_USER");
    }
 
    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http.authorizeRequests()
          .anyRequest().authenticated()
          .and()
          .httpBasic();
    }
    
    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder();
    }
}

Here we’re using inMemoryAuthentication to set up the authentication.

Spring Boot also requires these dependencies to make it work, but we only need to define the dependency of spring-boot-starter-security as this will automatically add all the relevant dependencies to the classpath.

The security configuration in Spring Boot is the same as the one above.

To see how the JPA configuration can be achieved in both Spring and Spring Boot, we can check out our article A Guide to JPA with Spring.

7. Application Bootstrap

The basic difference in bootstrapping an application in Spring and Spring Boot lies with the servlet. Spring uses either the web.xml or SpringServletContainerInitializer as its bootstrap entry point.

On the other hand, Spring Boot uses only Servlet 3 features to bootstrap an application. Let’s talk about this in detail.

7.1. How Spring Bootstraps?

Spring supports both the legacy web.xml way of bootstrapping as well as the latest Servlet 3+ method.

Let’s see the web.xml approach in steps:

  1. Servlet container (the server) reads web.xml.
  2. The DispatcherServlet defined in the web.xml is instantiated by the container.
  3. DispatcherServlet creates WebApplicationContext by reading WEB-INF/{servletName}-servlet.xml.
  4. Finally, the DispatcherServlet registers the beans defined in the application context.

Here’s how Spring bootstraps using the Servlet 3+ approach:

  1. The container searches for classes implementing ServletContainerInitializer and executes.
  2. The SpringServletContainerInitializer finds all classes implementing WebApplicationInitializer.
  3. The WebApplicationInitializer creates the context with XML or @Configuration classes.
  4. The WebApplicationInitializer creates the DispatcherServlet with the previously created context.

7.2. How Spring Boot Bootstraps?

The entry point of a Spring Boot application is the class which is annotated with @SpringBootApplication:

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

By default, Spring Boot uses an embedded container to run the application. In this case, Spring Boot uses the public static void main entry point to launch an embedded web server.

It also takes care of the binding of the Servlet, Filter, and ServletContextInitializer beans from the application context to the embedded servlet container.

Another feature of Spring Boot is that it automatically scans all the classes in the same package or sub packages of the Main-class for components.

Additionally, Spring Boot provides the option of deploying it as a web archive in an external container. In this case, we have to extend the SpringBootServletInitializer:

@SpringBootApplication
public class Application extends SpringBootServletInitializer {
    // ...
}

Here the external servlet container looks for the Main-class defined in the META-INF file of the web archive, and the SpringBootServletInitializer will take care of binding the Servlet, Filter, and ServletContextInitializer.

8. Packaging and Deployment

Finally, let’s see how an application can be packaged and deployed. Both of these frameworks support common package managing technologies like Maven and Gradle; however, when it comes to deployment, these frameworks differ a lot.

For instance, the Spring Boot Maven Plugin provides Spring Boot support in Maven. It also allows packaging executable jar or war archives and running an application “in-place.”

Some of the advantages of Spring Boot over Spring in the context of deployment include:

  • Provides embedded container support
  • Provision to run the jars independently using the command java -jar
  • Option to exclude dependencies to avoid potential jar conflicts when deploying in an external container
  • Option to specify active profiles when deploying
  • Random port generation for integration tests

9. Conclusion

In this article, we learned about the differences between Spring and Spring Boot.

In a few words, we can say that Spring Boot is simply an extension of Spring itself to make development, testing, and deployment more convenient.

A Guide to JPA with Spring
26
Mar
2021

A Guide to JPA with Spring

This tutorial shows how to set up Spring with JPA, using Hibernate as a persistence provider.

For a step by step introduction about setting up the Spring context using Java-based configuration and the basic Maven pom for the project, see this article.

We’ll start by setting up JPA in a Spring Boot project, then we’ll look into the full configuration we need if we have a standard Spring project.

JPA in Spring Boot

The Spring Boot project is intended to make creating Spring applications much faster and easier. This is done with the use of starters and auto-configuration for various Spring functionalities, JPA among them.

2.1. Maven Dependencies

To enable JPA in a Spring Boot application, we need the spring-boot-starter and spring-boot-starter-data-jpa dependencies:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>

The spring-boot-starter contains the necessary auto-configuration for Spring JPA. Also, the spring-boot-starter-jpa project references all the necessary dependencies such as hibernate-core.

2.2. Configuration

Spring Boot configures Hibernate as the default JPA provider, so it’s no longer necessary to define the entityManagerFactory bean unless we want to customize it.

Spring Boot can also auto-configure the dataSource bean, depending on the database we’re using. In the case of an in-memory database of type H2HSQLDB, and Apache Derby, Boot automatically configures the DataSource if the corresponding database dependency is present on the classpath.

For example, if we want to use an in-memory H2 database in a Spring Boot JPA application, we only need to add the h2 dependency to the pom.xml file:

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>1.4.200</version>
</dependency>

This way, we don’t need to define the dataSource bean, but we can do so if we want to customize it.

If we want to use JPA with MySQL database, then we need the mysql-connector-java dependency, as well as to define the DataSource configuration.

We can do this in a @Configuration class, or by using standard Spring Boot properties.

The Java configuration looks the same as it does in a standard Spring project:

@Bean
public DataSource dataSource() {
    DriverManagerDataSource dataSource = new DriverManagerDataSource();

    dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
    dataSource.setUsername("mysqluser");
    dataSource.setPassword("mysqlpass");
    dataSource.setUrl(
      "jdbc:mysql://localhost:3306/myDb?createDatabaseIfNotExist=true"); 
    
    return dataSource;
}

To configure the data source using a properties file, we have to set properties prefixed with spring.datasource:

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.username=mysqluser
spring.datasource.password=mysqlpass
spring.datasource.url=
  jdbc:mysql://localhost:3306/myDb?createDatabaseIfNotExist=true

Spring Boot will automatically configure a data source based on these properties.

Also in Spring Boot 1, the default connection pool was Tomcat, but with Spring Boot 2 it has been changed to HikariCP.

As we can see, the basic JPA configuration is fairly simple if we’re using Spring Boot.

However, if we have a standard Spring project, then we need more explicit configuration, using either Java or XML. That’s what we’ll focus on in the next sections.

3. The JPA Spring Configuration With Java – in a Non-Boot Project

To use JPA in a Spring project, we need to set up the EntityManager.

This is the main part of the configuration and we can do it via a Spring factory bean. This can be either the simpler LocalEntityManagerFactoryBean or the more flexible LocalContainerEntityManagerFactoryBean.

Let’s see how we can use the latter option:

@Configuration
@EnableTransactionManagement
public class PersistenceJPAConfig{

   @Bean
   public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
      LocalContainerEntityManagerFactoryBean em 
        = new LocalContainerEntityManagerFactoryBean();
      em.setDataSource(dataSource());
      em.setPackagesToScan(new String[] { "com.baeldung.persistence.model" });

      JpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
      em.setJpaVendorAdapter(vendorAdapter);
      em.setJpaProperties(additionalProperties());

      return em;
   }
   
   // ...

}

We also need to explicitly define the DataSource bean we’ve used above:

@Bean
public DataSource dataSource(){
    DriverManagerDataSource dataSource = new DriverManagerDataSource();
    dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
    dataSource.setUrl("jdbc:mysql://localhost:3306/spring_jpa");
    dataSource.setUsername( "tutorialuser" );
    dataSource.setPassword( "tutorialmy5ql" );
    return dataSource;
}

The final part of the configuration are the additional Hibernate properties and the TransactionManager and exceptionTranslation beans:

@Bean
public PlatformTransactionManager transactionManager() {
    JpaTransactionManager transactionManager = new JpaTransactionManager();
    transactionManager.setEntityManagerFactory(entityManagerFactory().getObject());

    return transactionManager;
}

@Bean
public PersistenceExceptionTranslationPostProcessor exceptionTranslation(){
    return new PersistenceExceptionTranslationPostProcessor();
}

Properties additionalProperties() {
    Properties properties = new Properties();
    properties.setProperty("hibernate.hbm2ddl.auto", "create-drop");
    properties.setProperty("hibernate.dialect", "org.hibernate.dialect.MySQL5Dialect");
       
    return properties;
}

4. The JPA Spring Configuration With XML

Next, let’s see the same Spring Configuration with XML:

<bean id="myEmf" 
  class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="dataSource" ref="dataSource" />
    <property name="packagesToScan" value="com.baeldung.persistence.model" />
    <property name="jpaVendorAdapter">
        <bean class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter" />
    </property>
    <property name="jpaProperties">
        <props>
            <prop key="hibernate.hbm2ddl.auto">create-drop</prop>
            <prop key="hibernate.dialect">org.hibernate.dialect.MySQL5Dialect</prop>
        </props>
    </property>
</bean>

<bean id="dataSource" 
  class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="com.mysql.cj.jdbc.Driver" />
    <property name="url" value="jdbc:mysql://localhost:3306/spring_jpa" />
    <property name="username" value="tutorialuser" />
    <property name="password" value="tutorialmy5ql" />
</bean>

<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
    <property name="entityManagerFactory" ref="myEmf" />
</bean>
<tx:annotation-driven />

<bean id="persistenceExceptionTranslationPostProcessor" class=
  "org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor" />

There’s a relatively small difference between the XML and the new Java-based configuration. Namely, in XML, a reference to another bean can point to either the bean or a bean factory for that bean.

In Java, however, since the types are different, the compiler doesn’t allow it, and so the EntityManagerFactory is first retrieved from its bean factory and then passed to the transaction manager:

transactionManager.setEntityManagerFactory(entityManagerFactory().getObject());

5. Going Full XML-less

Usually, JPA defines a persistence unit through the META-INF/persistence.xml file. Starting with Spring 3.1, the persistence.xml is no longer necessary. The LocalContainerEntityManagerFactoryBean now supports a packagesToScan property where the packages to scan for @Entity classes can be specified.

This file was the last piece of XML we need to remove. We can now set up JPA fully with no XML.

We would usually specify JPA properties in the persistence.xml file. Alternatively, we can add the properties directly to the entity manager factory bean:

factoryBean.setJpaProperties(this.additionalProperties());

As a side note, if Hibernate would be the persistence provider, then this would be the way to specify Hibernate specific properties as well.

6. The Maven Configuration

In addition to the Spring Core and persistence dependencies – show in detail in the Spring with Maven tutorial – we also need to define JPA and Hibernate in the project, as well as a MySQL connector:

<dependency>
   <groupId>org.hibernate</groupId>
   <artifactId>hibernate-core</artifactId>
   <version>5.2.17.Final</version>
   <scope>runtime</scope>
</dependency>

<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>8.0.19</version>
   <scope>runtime</scope>
</dependency>

Note that the MySQL dependency is included here as an example. We need a driver to configure the datasource, but any Hibernate-supported database will do.

7. Conclusion

This tutorial illustrated how to configure JPA with Hibernate in Spring in both a Spring Boot and a standard Spring application.

Intro to Spring Boot Starters
26
Mar
2021

Intro to Spring Boot Starters

1. Overview

Dependency management is a critical aspects of any complex project. And doing this manually is less than ideal; the more time you spent on it the less time you have on the other important aspects of the project.

Spring Boot starters were built to address exactly this problem. Starter POMs are a set of convenient dependency descriptors that you can include in your application. You get a one-stop-shop for all the Spring and related technology that you need, without having to hunt through sample code and copy-paste loads of dependency descriptors.

We have more than 30 Boot starters available – let’s see some of them in the following section

2. The Web Starter

First, let’s look at developing the REST service; we can use libraries like Spring MVC, Tomcat and Jackson – a lot of dependencies for a single application.

Spring Boot starters can help to reduce the number of manually added dependencies just by adding one dependency. So instead of manually specifying the dependencies just add one starter as in the following example:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

Now we can create a REST controller. For the sake of simplicity we won’t use the database and focus on the REST controller:

@RestController
public class GenericEntityController {
    private List<GenericEntity> entityList = new ArrayList<>();

    @RequestMapping("/entity/all")
    public List<GenericEntity> findAll() {
        return entityList;
    }

    @RequestMapping(value = "/entity", method = RequestMethod.POST)
    public GenericEntity addEntity(GenericEntity entity) {
        entityList.add(entity);
        return entity;
    }

    @RequestMapping("/entity/findby/{id}")
    public GenericEntity findById(@PathVariable Long id) {
        return entityList.stream().
                 filter(entity -> entity.getId().equals(id)).
                   findFirst().get();
    }
}

The GenericEntity is a simple bean with id of type Long and value of type String.

That’s it – with the application running, you can access http://localhost:8080/entity/all and check the controller is working.

We have created a REST application with quite a minimal configuration.

3. The Test Starter

For testing we usually use the following set of libraries: Spring Test, JUnit, Hamcrest, and Mockito. We can include all of these libraries manually, but Spring Boot starter can be used to automatically include these libraries in the following way:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

Notice that you don’t need to specify the version number of an artifact. Spring Boot will figure out what version to use – all you need to specify is the version of spring-boot-starter-parent artifact. If later on you need to upgrade the Boot library and dependencies, just upgrade the Boot version in one place and it will take care of the rest.

Let’s actually test the controller we created in the previous example.

There are two ways to test the controller:

  • Using the mock environment
  • Using the embedded Servlet container (like Tomcat or Jetty)

In this example we’ll use a mock environment:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
@WebAppConfiguration
public class SpringBootApplicationIntegrationTest {
    @Autowired
    private WebApplicationContext webApplicationContext;
    private MockMvc mockMvc;

    @Before
    public void setupMockMvc() {
        mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
    }

    @Test
    public void givenRequestHasBeenMade_whenMeetsAllOfGivenConditions_thenCorrect()
      throws Exception { 
        MediaType contentType = new MediaType(MediaType.APPLICATION_JSON.getType(),
        MediaType.APPLICATION_JSON.getSubtype(), Charset.forName("utf8"));
        mockMvc.perform(MockMvcRequestBuilders.get("/entity/all")).
        andExpect(MockMvcResultMatchers.status().isOk()).
        andExpect(MockMvcResultMatchers.content().contentType(contentType)).
        andExpect(jsonPath("$", hasSize(4))); 
    } 
}

The above test calls the /entity/all endpoint and verifies that the JSON response contains 4 elements. For this test to pass, we also have to initialize our list in the controller class:

public class GenericEntityController {
    private List<GenericEntity> entityList = new ArrayList<>();

    {
        entityList.add(new GenericEntity(1l, "entity_1"));
        entityList.add(new GenericEntity(2l, "entity_2"));
        entityList.add(new GenericEntity(3l, "entity_3"));
        entityList.add(new GenericEntity(4l, "entity_4"));
    }
    //...
}

What is important here is that @WebAppConfiguration annotation and MockMVC are part of the spring-test module, hasSize is a Hamcrest matcher, and @Before is a JUnit annotation. These are all available by importing one this one starter dependency.

4. The Data JPA Starter

Most web applications have some sort of persistence – and that’s quite often JPA.

Instead of defining all of the associated dependencies manually – let’s go with the starter instead:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

Notice that out of the box we have automatic support for at least the following databases: H2, Derby and Hsqldb. In our example, we’ll use H2.

Now let’s create the repository for our entity:

public interface GenericEntityRepository extends JpaRepository<GenericEntity, Long> {}

Time to test the code. Here is the JUnit test:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
public class SpringBootJPATest {
    
    @Autowired
    private GenericEntityRepository genericEntityRepository;

    @Test
    public void givenGenericEntityRepository_whenSaveAndRetreiveEntity_thenOK() {
        GenericEntity genericEntity = 
          genericEntityRepository.save(new GenericEntity("test"));
        GenericEntity foundedEntity = 
          genericEntityRepository.findOne(genericEntity.getId());
        
        assertNotNull(foundedEntity);
        assertEquals(genericEntity.getValue(), foundedEntity.getValue());
    }
}

We didn’t spend time on specifying the database vendor, URL connection, and credentials. No extra configuration is necessary as we’re benefiting from the solid Boot defaults; but of course all of these details can still be configured if necessary.

5. The Mail Starter

A very common task in enterprise development is sending email, and dealing directly with Java Mail API usually can be difficult.

Spring Boot starter hides this complexity – mail dependencies can be specified in the following way:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-mail</artifactId>
</dependency>

Now we can directly use the JavaMailSender, so let’s write some tests.

For the testing purpose, we need a simple SMTP server. In this example, we’ll use Wiser. This is how we can include it in our POM:

<dependency>
    <groupId>org.subethamail</groupId>
    <artifactId>subethasmtp</artifactId>
    <version>3.1.7</version>
    <scope>test</scope>
</dependency>

Here is the source code for the test:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
public class SpringBootMailTest {
    @Autowired
    private JavaMailSender javaMailSender;

    private Wiser wiser;

    private String userTo = "user2@localhost";
    private String userFrom = "user1@localhost";
    private String subject = "Test subject";
    private String textMail = "Text subject mail";

    @Before
    public void setUp() throws Exception {
        final int TEST_PORT = 25;
        wiser = new Wiser(TEST_PORT);
        wiser.start();
    }

    @After
    public void tearDown() throws Exception {
        wiser.stop();
    }

    @Test
    public void givenMail_whenSendAndReceived_thenCorrect() throws Exception {
        SimpleMailMessage message = composeEmailMessage();
        javaMailSender.send(message);
        List<WiserMessage> messages = wiser.getMessages();

        assertThat(messages, hasSize(1));
        WiserMessage wiserMessage = messages.get(0);
        assertEquals(userFrom, wiserMessage.getEnvelopeSender());
        assertEquals(userTo, wiserMessage.getEnvelopeReceiver());
        assertEquals(subject, getSubject(wiserMessage));
        assertEquals(textMail, getMessage(wiserMessage));
    }

    private String getMessage(WiserMessage wiserMessage)
      throws MessagingException, IOException {
        return wiserMessage.getMimeMessage().getContent().toString().trim();
    }

    private String getSubject(WiserMessage wiserMessage) throws MessagingException {
        return wiserMessage.getMimeMessage().getSubject();
    }

    private SimpleMailMessage composeEmailMessage() {
        SimpleMailMessage mailMessage = new SimpleMailMessage();
        mailMessage.setTo(userTo);
        mailMessage.setReplyTo(userFrom);
        mailMessage.setFrom(userFrom);
        mailMessage.setSubject(subject);
        mailMessage.setText(textMail);
        return mailMessage;
    }
}

In the test, the @Before and @After methods are in charge of starting and stopping the mail server.

Notice that we’re wiring in the JavaMailSender bean – the bean was automatically created by Spring Boot.

Just like any other defaults in Boot, the email settings for the JavaMailSender can be customized in application.properties:

spring.mail.host=localhost
spring.mail.port=25
spring.mail.properties.mail.smtp.auth=false

So we configured the mail server on localhost:25 and we didn’t require authentication.

6. Conclusion

In this article we have given an overview of Starters, explained why we need them and provided examples on how to use them in your projects.

Let’s recap the benefits of using Spring Boot starters:

  • increase pom manageability
  • production-ready, tested & supported dependency configurations
  • decrease the overall configuration time for the project
How to Change the Default Port in Spring Boot
26
Mar
2021

How to Change the Default Port in Spring Boot

Spring Boot provides sensible defaults for many configuration properties. But we sometimes need to customize these with our case-specific values.

And a common use case is changing the default port for the embedded server.

In this quick tutorial, we’ll cover several ways to achieve this.

2. Using Property Files

The fastest and easiest way to customize Spring Boot is by overriding the values of the default properties.

For the server port, the property we want to change is server.port.

By default, the embedded server starts on port 8080.

So, let’s see how to provide a different value in an application.properties file:

server.port=8081

Now the server will start on port 8081.

And we can do the same if we’re using an application.yml file:

server:
  port : 8081

Both files are loaded automatically by Spring Boot if placed in the src/main/resources directory of a Maven application.

2.1. Environment-Specific Ports

If we have an application deployed in different environments, we may want it to run on different ports on each system.

We can easily achieve this by combining the property files approach with Spring profiles. Specifically, we can create a property file for each environment.

For example, we’ll have an application-dev.properties file with this content:

server.port=8081

Then we’ll add another application-qa.properties file with a different port:

server.port=8082

Now, the property files configuration should be sufficient for most cases. However, there are other options for this goal, so let’s explore them as well.

3. Programmatic Configuration

We can configure the port programmatically either by setting the specific property when starting the application or by customizing the embedded server configuration.

First, let’s see how to set the property in the main @SpringBootApplication class:

@SpringBootApplication
public class CustomApplication {
    public static void main(String[] args) {
        SpringApplication app = new SpringApplication(CustomApplication.class);
        app.setDefaultProperties(Collections
          .singletonMap("server.port", "8083"));
        app.run(args);
    }
}

Next, to customize the server configuration, we have to implement the WebServerFactoryCustomizer interface:

@Component
public class ServerPortCustomizer 
  implements WebServerFactoryCustomizer<ConfigurableWebServerFactory> {
 
    @Override
    public void customize(ConfigurableWebServerFactory factory) {
        factory.setPort(8086);
    }
}

Note that this applies to the Spring Boot 2.x version.

For Spring Boot 1.x, we can similarly implement the EmbeddedServletContainerCustomizer interface.

4. Using Command-Line Arguments

When packaging and running our application as a jar, we can set the server.port argument with the java command:

java -jar spring-5.jar --server.port=8083

or by using the equivalent syntax:

java -jar -Dserver.port=8083 spring-5.jar

5. Order of Evaluation

As a final note, let’s look at the order in which these approaches are evaluated by Spring Boot.

Basically, the configurations priority is

  • embedded server configuration
  • command-line arguments
  • property files
  • main @SpringBootApplication configuration

6. Conclusion

In this article, we saw how to configure the server port in a Spring Boot application.

Why Kafka Is so Fast
26
Mar
2021

Why Kafka Is so Fast

Discover the deliberate design decisions that have made Kafka the performance powerhouse it is today.

The last few years have brought about immense changes in the software architecture landscape. The notion of a single monolithic application or even several coarse-grained services sharing a common data store has been all but erased from the hearts and minds of software practitioners world-wide. Autonomous microservices, event-driven architecture, and CQRS are the dominant tools in the construction of contemporary business-centric applications. To top it off, the proliferation of device connectivity — IoT, mobile, wearables — is creating an upward pressure on the number of events a system must handle in near-real-time.

Let’s start by acknowledging that the term ‘fast’ is multi-faceted, complex, and highly ambiguous. Latency, throughput, jitter, are metrics that shape and influence one’s interpretation of the term. It is also inherently contextual: the industry and application domains in themselves set the norms and expectations around performance. Whether or not something is fast depends largely on one’s frame of reference.

Apache Kafka is optimized for throughput at the expense of latency and jitter, while preserving other desirable qualities, such as durability, strict record order, and at-least-once delivery semantics. When someone says ‘Kafka is fast’, and assuming they are at least mildly competent, you can assume they are referring to Kafka’s ability to safely accumulate and distribute a very high number of records in a short amount of time.

Historically, Kafka was born out of LinkedIn’s need to move a very large number of messages efficiently, amounting to multiple terabytes of data on an hourly basis. The individual message propagation delay was deemed of secondary importance, as was the variability of that time. After all, LinkedIn is not a financial institution that engages in high-frequency trading, nor is it an industrial control system that operates within deterministic deadlines. Kafka can be used to implement near-real-time (otherwise known as soft real-time) systems.

Note: For those unfamiliar with the term, ‘real-time’ does not mean ‘fast’, it means ‘predictable’. Specifically, real-time implies a hard upper bound, otherwise known as a deadline, on the time taken to complete an action. If the system as a whole is unable to meet this deadline each and every time, it cannot be classed as real-time. Systems that are able to perform within a probabilistic tolerance, are labelled as ‘near-real-time’. In terms of sheer throughput, real-time systems are often slower than their near-real-time or non-real-time counterparts.

There are two significant areas that Kafka draws upon for its speed, and they need to be discussed separately. The first relates to the low-level efficiency of the client and broker implementations. The second derives from the opportunistic parallelism of stream processing.

Broker performance

Log-structured persistence

Kafka utilizes a segmented, append-only log, largely limiting itself to sequential I/O for both reads and writes, which is fast across a wide variety of storage media. There is a wide misconception that disks are slow; however, the performance of storage media (particularly rotating media) is greatly dependent on access patterns. The performance of random I/O on a typical 7,200 RPM SATA disk is between three and four orders of magnitude slower when compared to sequential I/O. Furthermore, a modern operating system provides read-ahead and write-behind techniques that prefetch data in large block multiples and group smaller logical writes into large physical writes. Because of this, the difference between sequential I/O and random I/O is still evident in flash and other forms of solid-state non-volatile media, although it is far less dramatic compared to rotating media.

Record batching

Sequential I/O is blazingly fast on most media types, comparable to the peak performance of network I/O. In practice, this means that a well-designed log-structured persistence layer will keep up with the network traffic. In fact, quite often the bottleneck with Kafka isn’t the disk, but the network. So in addition to the low-level batching provided by the OS, Kafka clients and brokers will accumulate multiple records in a batch — for both reading and writing — before sending them over the network. Batching of records amortizes the overhead of the network round-trip, using larger packets and improving bandwidth efficiency.

Batch compression

The impact of batching is particularly obvious when compression is enabled, as compression becomes generally more effective as the data size increases. Especially when using text-based formats such as JSON, the effects of compression can be quite pronounced, with compression ratios typically ranging from 5x to 7x. Furthermore, record batching is largely done as a client-side operation, which transfers the load onto the client and has a positive effect not only on the network bandwidth but also on the brokers’ disk I/O utilization.

Cheap consumers

Unlike traditional MQ-style brokers which remove messages at point of consumption (incurring the penalty of random I/O), Kafka doesn’t remove messages after they are consumed — instead, it independently tracks offsets at each consumer group level. The progression of offsets themselves is published on an internal Kafka topic __consumer_offsets. Again, being an append-only operation, this is fast. The contents of this topic are further reduced in the background (using Kafka’s compaction feature) to only retain the last known offsets for any given consumer group.

Compare this model with more traditional message brokers which typically offer several contrasting message distribution topologies. On one hand is the message queue — a durable transport for point-to-point messaging, with no point-to-multipoint ability. On the other hand, a pub-sub topic allows for point-to-multipoint messaging but does so at the expense of durability. Implementing a durable point-to-multipoint messaging model in a traditional MQ requires maintaining a dedicated message queue for each stateful consumer. This creates both read and write amplification. On one hand, the publisher is forced to write to multiple queues. Alternatively, a fan-out relay may consume records from one queue and write to several others, but this only defers the point of amplification. On the other hand, several consumers are generating load on the broker — being a mixture of read and write I/O, both sequential and random.

Consumers in Kafka are ‘cheap’, insofar as they don’t mutate the log files (only the producer or internal Kafka processes are permitted to do that). This means that a large number of consumers may concurrently read from the same topic without overwhelming the cluster. There is still some cost in adding a consumer, but it is mostly sequential reads with a low rate of sequential writes. So it’s fairly normal to see a single topic being shared across a diverse consumer ecosystem.

Unflushed buffered writes

Another fundamental reason for Kafka’s performance, and one that is worth exploring further: Kafka doesn’t actually call fsync when writing to the disk before acknowledging the write; the only requirement for an ACK is that the record has been written to the I/O buffer. This is a little known fact, but a crucial one: in fact, this is what actually makes Kafka perform as if it were an in-memory queue — because for all intents and purposes Kafka is a disk-backed in-memory queue (limited by the size of the buffer/pagecache).

On the flip side, this form of writing is unsafe, as the failure of a replica can lead to a data loss even though the record has seemingly been acknowledged. In other words, unlike say a relational database, acknowledging a write alone does not imply durability. What makes Kafka durable is running several in-sync replicas; even if one were to fail, the others (assuming there is more than one) will remain operational — providing that the failure is uncorrelated (i.e. multiple replicas failing simultaneously due of a common upstream failure). So the combination of a non-blocking approach to I/O with no fsync, and redundant in-sync replicas give Kafka the combination of high throughput, durability, and availability.

Client-side optimisations

Most databases, queues, and other forms of persistent middleware are designed around the notion of an all-mighty server (or a cluster of servers), and fairly thin clients that communicate with the server(s) over a well-known wire protocol. Client implementations are generally considered to be significantly simpler than the server. As a result, the server will absorb the bulk of the load — the clients merely act as interfaces between the application code and the server.

Kafka takes a different approach to client design. A significant amount of work is performed on the client before records get to the server. This includes the staging of records in an accumulator, hashing the record keys to arrive at the correct partition index, checksumming the records and the compression of the record batch. The client is aware of the cluster metadata and periodically refreshes this metadata to keep abreast of any changes to the broker topology. This lets the client make low-level forwarding decisions; rather than sending a record blindly to the cluster and relying on the latter to forward it to the appropriate broker node, a producer client will forward writes directly to partition masters. Similarly, consumer clients are able to make intelligent decisions when sourcing records, potentially using replicas that geographically closer to the client when issuing read queries. (This feature is a more recent addition to Kafka, available as of version 2.4.0.)

Zero-copy

One of the typical sources of inefficiencies is copying byte data between buffers. Kafka uses a binary message format that is shared by the producer, the broker, and the consumer parties so that data chunks can flow end-to-end without modification, even if it’s compressed. While eliminating structural differences between communicating parties is an important step, it doesn’t in itself avoid the copying of data.

Kafka solves this problem on Linux and UNIX systems by using Java’s NIO framework, specifically, the transferTo() method of a java.nio.channels.FileChannel. This method permits the transfer of bytes from a source channel to a sink channel without involving the application as a transfer intermediary. To appreciate the difference that NIO makes, consider the traditional approach where a source channel is read into a byte buffer, then written to a sink channel as two separate operations:

File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

Diagrammatically, this can be represented using the following.

Although this looks simple enough, internally, the copy operation requires four context switches between user mode and kernel mode, and the data is copied four times before the operation is complete. The diagram below outlines the context switches at each step.

Looking at this in more detail —

  1. The initial read() causes a context switch from user mode to kernel mode. The file is read, and its contents are copied to a buffer in the kernel address space by the DMA (Direct Memory Access) engine. This is not the same buffer that was used in the code snippet.
  2. Prior to returning from read(), the kernel buffer is copied into the user-space buffer. At this point, our application can read the contents of the file.
  3. The subsequent send() will switch back into kernel mode, copying the user-space buffer into the kernel address space — this time into a different buffer associated with the destination socket. Behind the scenes, the DMA engine takes over, asynchronously copying the data from the kernel buffer to the protocol stack. The send() method does not wait for this prior to returning.
  4. The send() call returns, switching back to the user-space context.

In spite of its mode-switching inefficiencies and additional copying, the intermediate kernel buffer can actually improve performance in many cases. It can act as a read-ahead cache, asynchronously prefetching blocks, thereby front-running requests from the application. However, when the amount of requested data is significantly larger than the kernel buffer size, the kernel buffer becomes a performance bottleneck. Rather than copying the data directly, it forces the system to oscillate between user and kernel modes until all the data is transferred.

By contrast, the zero-copy approach is handled in a single operation. The snippet from the earlier example can be rewritten as a one-liner:

fileDesc.transferTo(offset, len, socket);

The zero-copy approach is illustrated below.

Under this model, the number of context switches is reduced to one. Specifically, the transferTo() method instructs the block device to read data into a read buffer by the DMA engine. This buffer is then copied another kernel buffer for staging to the socket. Finally, the socket buffer is copied to the NIC buffer by DMA.

As a result, we have reduced the number of copies from four to three, and only one of those copies involves the CPU. We have also reduced the number of context switches from four to two.

This is a massive improvement, but it’s not query zero-copy yet. The latter can be achieved as a further optimization when running Linux kernels 2.4 and later, and on network interface cards that support the gather operation. This is illustrated below.

Calling the transferTo() method causes the device to read data into a kernel read buffer by the DMA engine, as per the previous example. However, with the gather operation, there is no copying between the read buffer and the socket buffer. Instead, the NIC is given a pointer to the read buffer, along with the offset and the length, which is vacuumed up by DMA. At no point is the CPU involved in copying buffers.

Comparisons of traditional and zero-copy on file sizes ranging from a few megabytes to a gigabyte show performance gains by a factor of two to three in favor of zero-copy. But what’s more impressive, is that Kafka achieves this using a plain JVM with no native libraries or JNI code.

Avoiding the GC

The heavy use of channels, native buffers, and the page cache has one additional benefit — reducing the load on the garbage collector (GC). For example, running Kafka on a machine with 32 GB of RAM will result in 28–30 GB usable for the page cache, completely outside of the GC’s scope. The difference in throughput is minimal — in the region of several percentage points — as the throughput of a correctly-tuned GC can be quite high, especially when dealing with short-lived objects. The real gains are in the reduction of jitter; by avoiding the GC, the brokers are less likely to experience a pause that may impact the client, extending the end-to-end propagation delay of records.

To be fair, the avoidance of GC is less of a problem now, compared to what it used to be when Kafka was conceived. Modern GCs like Shenandoah and ZGC scale to huge, multi-terabyte heaps, and have tunable worst-case pause times, down to single-digit milliseconds. It is not uncommon these days to see JVM-based applications using large heap-based caches outperform off-heap designs.

Stream parallelism

The efficiency of log-structured I/O is one crucial aspect of performance, mostly affecting writes; Kafka’s treatment of parallelism in the topic structure and the consumer ecosystem is fundamental to its read performance. The combination produces an overall very high end-to-end messaging throughput. Concurrency is ingrained into its partitioning scheme and the operation of consumer groups, which is effectively a load-balancing mechanism within Kafka — distributing partition assignments approximately evenly among the individual consumer instances within the group. Compare this to a more traditional MQ: in an equivalent RabbitMQ setup, multiple concurrent consumers may read from a queue in a round-robin fashion, but in doing so they forfeit the notion of message ordering.

The partitioning mechanism also allows for the horizontal scalability of Kafka brokers. Every partition has a dedicated leader; any nontrivial topic (with multiple partitions) can, therefore, utilize the entire cluster of broker for writes. This is yet another point of distinction between Kafka and a message queue; where the latter utilizes clustering for availability, Kafka will genuinely balance the load across the brokers for availability, durability, and throughput.

The producer specifies the partition when publishing a record, assuming that you are publishing to a topic with multiple partitions. (One may have a single-partition topic, in which case this is a non-issue.) This may be accomplished either directly — by specifying a partition index, or indirectly — by way of a record key, which deterministically hashes to a consistent (i.e. same every time) partition index. Records sharing the same hash are guaranteed to occupy the same partition. Assuming a topic with multiple partitions, records with a different key will likely end up in different partitions. However, due to hash collisions, records with different hashes may also end up in the same partition. Such is the nature of hashing. If you understand how a hash table works, this is no different.

The actual processing of records is done by consumers, operating within an (optional) consumer group. Kafka guarantees that a partition may only be assigned to at most one consumer within its consumer group. (We say ‘at most’ to cover the case when all consumers are offline.) When the first consumer in a group subscribes to the topic, it will receive all partitions on that topic. When a second consumer subsequently joins, it will get approximately half of the partitions, relieving the first consumer of half of its prior load. This enables you to process an event stream in parallel, adding consumers as necessary (ideally, using an auto-scaling mechanism), providing that you have adequately partitioned your event stream.

Control of record throughput accomplished in two ways:

  1. The topic partitioning scheme. Topics should be partitioned to maximize the number of independent event sub-streams. In other words, record order should only be preserved where it is absolutely necessary. If any two records are not legitimately related in a causal sense, they shouldn’t be bound to the same partition. This implies the use of different keys, as Kafka will use a record’s key as a hashing source to derive its consistent partition mapping.
  2. The number of consumers in the group. You can increase the number of consumers to match the load of inbound records, up to the number of partitions in the topic. (You can have more consumers if you wish, but the partition count will place an upper bound on the number of active consumers which get at least one partition assignment; the remaining consumers will remain idle.) Note that a consumer could be a process or a thread. Depending on the type of workload that the consumer performs, you may be able to employ multiple individual consumer threads or process records in a thread pool.

If you were wondering whether Kafka is fast, how it achieves its renowned performance characteristics, or if it can scale to your use cases, you should hopefully by now have all the answers you need.

To make things abundantly clear, Kafka is not the fastest (that is, most throughput-capable) messaging middleware — there are other platforms capable of greater throughput — some are software-based and some are implemented in hardware. Nor is it the best throughput-latency compromise — Apache Pulsar is a promising technology that is scalable and achieves a better throughput-latency profile while offering identical ordering and durability guarantees. The rationale for adopting Kafka is that as a complete ecosystem, it remains unmatched overall. It exhibits excellent performance while offering an environment that is abundant and mature, but also involving — in spite of its size, Kafka is still growing at an enviable pace.

The designers and maintainers of Kafka have done an amazing job at devising a solution that is performance-oriented at its core. Few of its design elements feel like an afterthought or a bolt-on. From offloading of work to clients to the log-structured persistence on the broker, batching, compression, zero-copy I/O, and stream-level parallelism — Kafka throws down the gauntlet to just about any other message-oriented middleware, commercial or open-source. And most impressively, it does so without compromising on qualities such as durability, record order, and at-least-once delivery semantics.

Kafka is not the simplest of messaging platforms, and there is a fair bit to learn. One must come to grips with the concepts of a total and partial order, topics, partitions, consumers and consumer groups, before comfortably designing and building high-performance event-driven systems. And while the knowledge curve is substantial, the results will certainly be worth your while. If you are keen on taking the proverbial ‘red pill’, read the Introduction to Event Streaming with Kafka and Kafdrop.

Introduction to Event Streaming with Kafka and Kafdrop
26
Mar
2021

Introduction to Event Streaming with Kafka and Kafdrop

Event sourcing, eventual consistency, microservices, CQRS… These are quickly becoming household names in mainstream application development. But do you know what makes them tick? What are the basic building blocks required to assemble complex, business-centric applications from fine-grained services without turning the lot into a big ball of mud?

This article examines a fundamental building block — event streaming. Leading the charge will be Apache Kafka — the de facto standard in event streaming platforms, which we’ll observe through Kafdrop — a feature-packed web UI.

A Brief Intro

Event streaming platforms reside in the broader class of Message-oriented Middleware (MoM) and are similar to traditional message queues and topics but offer stronger temporal guarantees and typically order-of-magnitude performance gains due to log-structured immutability. In simple terms, write operations are mostly limited to sequential appends, which make them fast. Really fast.

Whereas messages in a traditional Message Queue (MQ) tend to be arbitrarily ordered and generally independent of one another, events (or records) in a stream tend to be strongly ordered, often chronologically or causally. Also, a stream persists its records, whereas an MQ will discard a message as soon as it has been read.

For this reason, event streaming tends to be a better fit for Event-Driven Architectures, encompassing event sourcing, eventual consistency, and CQRS concepts. (Of course, there are FIFO message queues too, but the differences between FIFO queues and fully-fledged event streaming platforms are quite substantial, not limited to ordering alone.)

Event streaming platforms are a comparatively recent paradigm within the broader MoM domain. There are only a handful of mainstream implementations available, compared to hundreds of MQ-style brokers, some going back to the 1980s (e.g. Tuxedo). Compared to established standards such as AMQP, MQTT, XMPP, and JMS, there are no equivalent standards in the streaming space.

Event streaming platforms are an active area of continuous research and experimentation. In spite of this, streaming platforms aren’t just a niche concept or an academic idea with a few esoteric use cases; they can be applied effectively to a broad range of messaging and eventing scenarios, routinely displacing their more traditional counterparts.

You may also like: A Kafka Tutorial for Everyone, no Matter Your Stage in Development.

Architecture Overview

The diagram below offers a brief overview of the Kafka component architecture. While the intention isn’t to indoctrinate you with all of Kafka’s inner workings, some appreciation of its design will go a long way in explaining the key concepts that we will cover shortly.

Kafka Architecture Overview

Kafka is a distributed system comprising several key components:

  • Broker nodes: Responsible for the bulk of I/O operations and durable persistence within the cluster. Brokers accommodate the append-only log files that comprise the topic partitions hosted by the cluster. Partitions can be replicated across multiple brokers for both horizontal scalability and increased durability — these are called replicas. A broker node may act as the leader for certain replicas, while being a follower for others. A single broker node will also be elected as the cluster controller — responsible for the internal management of partition states. This includes the arbitration of the leader-follower roles for any given partition.
  • ZooKeeper nodes: Under the hood, Kafka needs a way to manage the overall controller status in the cluster. Should the controller drop out for whatever reason, there is a protocol in place to elect another controller from the set of remaining brokers. The actual mechanics of controller election, heart-beating, and so forth, are largely implemented in ZooKeeper. ZooKeeper also acts as a configuration repository of sorts, maintaining cluster metadata, leader-follower states, quotas, user information, ACLs, and other housekeeping items. Due to the underlying gossiping and consensus protocol, the number of ZooKeeper nodes must be odd.
  • Producers: These are client applications responsible for appending records to Kafka topics. Because of the log-structured nature of Kafka and the ability to share topics across multiple consumer ecosystems, only producers are able to modify the data in the underlying log files. The actual I/O is performed by the broker nodes on behalf of the producer clients. Any number of producers may publish to the same topic, selecting the partitions used to persist the records.
  • Consumers: These are client applications that read from topics. Any number of consumers may read from the same topic; however, depending on the configuration and grouping of consumers, there are rules governing the distribution of records among the consumers.

Topics, Partitions, Records, and Offsets

partition is a totally ordered sequence of records and is fundamental to Kafka. A record has an ID — a 64-bit integer offset and a millisecond-precise timestamp. Also, it may have a key and a value; both are byte arrays and both are optional. The term “totally ordered” simply means that for any given producer, records will be written in the order they were emitted by the application. If record P was published before Q, then P will precede Q in the partition. (Assuming P and Q share a partition.)

Furthermore, they will be read in the same order by all consumers; P will always be read before Q, for every possible consumer. This ordering guarantee is vital in a large majority of use cases. Published records will generally correspond to some real-life events, and preserving the timeline of these events is often essential.

Note: Kafka uses the term “record,” where others might use “message” or “event.” In this article, we will use the terms interchangeably, depending on the context. Similarly, you might see the term “stream” as a generic substitute for “topic.”

There is no recognized ordering across producers. If two (or more) producers emit records simultaneously, those records may materialize in arbitrary order. That said, this order will still be observed uniformly across all consumers.

A record’s offset uniquely identifies it in the partition. The offset is a strictly monotonically-increasing integer in a sparse address space, meaning that each successive offset is always higher than its predecessor and there may be varying gaps between neighboring offsets. Gaps might legitimately appear if compaction is enabled or as a result of transactions; we don’t need to delve into the details at this stage. Suffice it to say that offsets need not be contiguous.

Your application shouldn’t attempt to literally interpret an offset or guess what the next offset might be. It may, however, infer the relative order of any record pair based on their offsets, sort the records by their offset, and so forth.

The diagram below shows what a partition looks like on the inside.1

     start of partition

2

+--------+-----------------+

3

|0..00000|First record     |

4

+--------+-----------------+

5

|0..00001|Second record    |

6

+--------+-----------------+

7

|0..00002|Third record     |

8

+--------+-----------------+

9

|0..00003|Fourth record    |

10

+--------+-----------------+

11

|0..00007|Fifth record     |

12

+--------+-----------------+

13

|0..00008|Sixth record     |

14

+--------+-----------------+

15

|0..00010|Seventh record   |

16

+--------+-----------------+

17

            ...

18

+--------+-----------------+

19

|0..56789|Last record      |

20

+--------+-----------------+

21

       end of partition

The beginning offset, also called the low-water mark, is the first message that will be presented to a consumer. Due to Kafka’s bounded retention, this is not necessarily the first message that was published. Records may be pruned on the basis of time and/or partition size. When this occurs, the low-water mark will appear to advance, and records earlier than the low-water mark will be truncated.

Conversely, the high-water mark is the offset immediately following the last record in the partition, also known as the end offset. It is the offset that will be assigned to the next record that will be published. It is not the offset of the last record.

topic is a logical composition of partitions. A topic may have one or more partitions, and a partition must be a part of exactly one topic. Topics are fundamental to Kafka, allowing for both parallelism and load balancing.

Earlier, we said that partitions exhibit total order. Because partitions within a topic are mutually independent, the topic is said to exhibit partial order. In simple terms, this means that certain records may be ordered in relation to one another while being unordered with respect to certain other records. The concepts of total and partial order, while sounding somewhat academic, are hugely important in the construction of performant event streaming pipelines. They enables us to process records in parallel where we can, while maintaining order where we must. We’ll explore the concepts of record order, consumer parallelism, and topic sizing in a short while.

Example: Publishing Messages

Let’s put some of this theory into practice. We are going to spin up a pair of Docker containers — one for Kafka and another for Kafdrop. Rather than launching them individually, we’ll use Docker Compose.

Create a docker-compose.yaml file in a directory of your choice, containing the following:1

version: "2"

2

services:

3

  kafdrop:

4

    image: obsidiandynamics/kafdrop

5

    restart: "no"

6

    ports:

7

      - "9000:9000"

8

    environment:

9

      KAFKA_BROKERCONNECT: "kafka:29092"

10

    depends_on:

11

      - "kafka"

12

  kafka:

13

    image: obsidiandynamics/kafka

14

    restart: "no"

15

    ports:

16

      - "2181:2181"

17

      - "9092:9092"

18

    environment:

19

      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"

20

      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"

21

      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"

22

      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"

Note: We’re using the obsidiandynamics/kafka image for convenience because it neatly bundles Kafka and ZooKeeper into a single image. If you wanted to, you could replace this with images from Confluent or Wurstmeister, but then you’d have to wire it all up properly. The obsidiandynamics/kafka image does all this for you, so it’s highly recommended for beginners (and lazy pros).

Then, start it with docker-compose up. Once it boots, navigate to localhost:9000 in your browser. You should see the Kafdrop landing screen.

Kafdrop landing page

You should see our single-broker cluster. It’s a promising start, but there are no topics. Not a problem; let’s create a topic and publish some messages using Kafka’s command-line tools. Conveniently, we already have a Kafka image running as part of our docker-compose stack, so we can shell into it to use the built-in CLI tools.1

docker exec -it kafka-kafdrop_kafka_1 bash

This gets you into a Bash shell. The tools are in the /opt/kafka/bin directory, so let’s cd into it:1

cd /opt/kafka/bin

Create a topic named streams-intro with three partitions:1

./kafka-topics.sh --bootstrap-server localhost:9092 \

2

    --create --partitions 3 --replication-factor 1 \

3

    --topic streams-intro

Switching back to Kafdrop, we should now see the new topic in the list.

Kafdrop topics list

Time to publish stuff. We are going to use the kafka-console-producer tool:1

./kafka-console-producer.sh --broker-list localhost:9092 \

2

    --topic streams-intro --property "parse.key=true" \

3

    --property "key.separator=:"

Note:kafka-topics uses the --bootstrap-server argument to configure the Kafka broker list, while kafka-console-producer uses the --broker-list argument for the same purpose. Also, --property arguments are largely undocumented; be prepared to Google your way around.

Records are separated by newlines. The key and the value parts are delimited by colons, as indicated by the key.separator property. For the sake of an example, type in the following (a copy-paste will do):1

foo:first message

2

foo:second message

3

bar:first message

4

foo:third message

5

bar:second message

Press CTRL+D when done. Then, switch back to Kafdrop and click on the streams-intro topic. You’ll see an overview of the topic, along with a detailed breakdown of the underlying partitions:

Kafdrop topic overview

Let’s pause for a moment and dissect what’s been done. We created a topic with three partitions. We then published five records using two unique keys — foo and bar. Kafka uses keys to map records to partitions, such that all records with the same key will always appear on the same partition. Handy, but also important because it lets the publisher dictate the precise order of records. We’ll discuss key hashing and partition assignments in more detail later; in the meanwhile, sit back and enjoy the ride.

Looking at the partitions table, partition #0 has the first and last offsets at zero and two respectively. Partition #2 has them at zero and three, while partition #1 appears to blank. Clicking on #0 in the Kafdrop web UI sends us to a topic viewer:

Kafdrop topic viewer

We can see the two records published under the bar key. Note, they are completely unrelated to the foo records. Other than being collated within the same topic, there is nothing that binds records across partitions.

Note: In case you were wondering, the arrow to the left of the message lets you expand and pretty-print JSON-encoded messages. As our examples didn’t use JSON, there’s nothing to pretty-print.

It can be said without exaggeration that Kafka’s built-in tooling is an abomination. There is no consistency in the naming of command arguments and the simple act of publishing keyed messages requires you to jump through hoops — passing in obscure, undocumented properties. The usability of the built-in tools is a well-known heartache within the Kafka community. This is a real shame. It’s like buying a Ferrari, only to have it delivered with plastic hub caps. Fortunately, there are alternatives — both commercial and open source — that can fill the glaring gaps in tooling and observability.

Consumers and Consumer Groups

So far we have learned that producers emit records into the stream; these records are organized into nicely ordered partitions. Kafka’s pub-sub topology adheres to a flexible multipoint-to-multipoint model, meaning that there may be any number of producers and consumers simultaneously interacting with a stream. Depending on the actual solution context, stream topologies may also be point-to-multipoint, multipoint-to-point, and point-to-point. It’s about time we looked at how records are consumed.

consumer is a process or a thread that attaches to a Kafka cluster via a client library. (One is available for most languages.) A consumer generally, but not necessarily, operates as part of an encompassing consumer group. The group is specified by the group.id property. Consumer groups are effectively a load-balancing mechanism within Kafka — distributing partition assignments approximately evenly among the individual consumer instances within the group.

When the first consumer in a group joins the topic, it will receive all partitions in that topic. When a second consumer subsequently joins, it will get approximately half of the partitions, relieving the first consumer of half of its prior load. The process runs in reverse when consumers leave (by disconnecting or timing out) — the remaining consumers will absorb a greater number of partitions.

So, a consumer siphons records from a topic, pulling from the share of partitions that have been assigned to it by Kafka, alongside the other consumers in its group. As far as load-balancing goes, this should be fairly straightforward. But here’s the kicker — the act of consuming a record does not remove it. This might seem contradictory at first, especially if you associate the act of consuming with depletion. (If anything, a consumer should have been called a ‘reader’, but let’s not dwell on the choice of terminology.)

The simple fact is, consumers have absolutely no impact on topics and their partitions. A topic is an append-only ledger that may only be mutated by the producer, or by Kafka itself (as part of compaction or cleanup). Consumers are “cheap,” so you can have quite a number of them tail the logs without stressing the cluster. This is yet another point of distinction between an event stream and a traditional message queue, and it’s a crucial one.

A consumer internally maintains an offset that points to the next record in a partition, advancing the offset for every successive read. When a consumer first subscribes to a topic, it may elect to start at either the head-end or the tail-end of the topic. This behavior is controlled by setting the auto.offset.reset property to one of latestearliest or none. In the latter case, an exception will be thrown if no previous offset exists for the consumer group.

Consumers retain their offset state vector locally. Since consumers across different consumer groups do not interfere, there may be any number of them reading concurrently from the same topic. Consumers run at their own pace — a slow or backlogged consumer has no impact on its peers.

To illustrate this concept, consider a scenario involving a topic with two partitions. Two consumer groups, A and B, are subscribed to the topic. Each group has three instances, the consumers being named A1A2A3B1B2, and B3. The diagram below illustrates how the two groups might share the topic and how the consumers advance through the records independently of one another.1

               Partition 0                       Partition 1

2

               +--------+                        +--------+

3

               |0..00000|                        |0..00000|

4

               +--------+                        +--------+

5

               |0..00001| <= consumer A2         |0..00001|

6

               +--------+                        +--------+

7

               |0..00002|                        |0..00002| <= consumer A1

8

               +--------+                        +--------+

9

               |0..00003|                        |0..00003| 

10

               +--------+                        +--------+

11

                  ...                               ...

12

               +--------+                        +--------+

13

               |0..00008| <= consumer B3         |0..00008| <= consumer B2

14

               +--------+                        +--------+

15

               |0..00009|                        |0..00009|

16

               +--------+                        +--------+

17

producer P1 => |0..00010|                        |0..00010|

18

               +--------+                        +--------+

19

                                  producer P1 => |0..00011|

20

                                                 +--------+

21

Look carefully and you’ll notice something is missing. Consumers A3 and B1 aren’t there. That’s because Kafka guarantees that a partition may only be assigned to at most one consumer within its consumer group. (We say ‘at most’ to cover the case when all consumers are offline.) Because there are three consumers in each group, but only two partitions, one consumer will remain idle — waiting for another consumer in its respective group to depart before being assigned a partition.

In this manner, consumer groups are not only a load-balancing mechanism, but also a fence-like exclusivity control, used to build highly performant pipelines without sacrificing safety, particularly when there is a requirement that a record may only be handled by one thread or process at any given time.

Consumer groups are also used to ensure availability. By periodically pulling records from a topic, the consumer implicitly signals to the cluster that it’s in a ‘healthy’ state, thereby extending the lease over its partition assignment. However, should the consumer fail to read again within the allowable deadline, it will be deemed faulty and its partitions will be reassigned — apportioned among the remaining ‘healthy’ consumers within its group. This deadline is controlled by the max.poll.interval.ms consumer client property, set to five minutes by default.

To use a transportation analogy, a topic is like a highway, while a partition is a lane. A record is the equivalent of a car, and its occupants correspond to the record’s value. Several cars can safely travel on the same highway, providing they keep to their lane. Cars sharing the same line ride in a sequence, forming a queue. Now, suppose each lane leads to an off-ramp, diverting its traffic to some location. If one off-ramp gets banked up, other off-ramps may still flow smoothly.

It’s precisely this highway-lane metaphor that Kafka exploits to achieve its end-to-end throughput, easily reaching millions of records per second on commodity hardware. When creating a topic, one can choose the partition count — the number of lanes, if you will.

The partitions are divided approximately evenly among the individual consumers in a consumer group, with a guarantee that no partition will be assigned to two (or more) consumers at the same time, providing that these consumers are part of the same consumer group. Referring to our analogy, a car will never end up in two off-ramps simultaneously; however, two lanes might conceivably lead to the same off-ramp.

Note: A topic may be resized after creation by increasing the number of partitions. It is not possible, however, to decrease the partition count without recreating the topic.

Records correspond to events, messages, commands, or any other streamable content. Precisely how records are partitioned is left to the discretion of the producer(s). A producer may explicitly assign a partition index when publishing a record, although this approach is rarely used. A much more common approach is to assign a key to a record, as we have done in our earlier example. The key is completely opaque to Kafka. In other words, Kafka doesn’t attempt to interpret the contents of the key, treating it as an array of bytes. These bytes are hashed to derive a partition index, using a consistent hashing technique.

Records sharing the same hash are guaranteed to occupy the same partition. Assuming a topic with multiple partitions, records with a different key will likely end up in different partitions. However, due to hash collisions, records with different hashes may also end up in the same partition. Such is the nature of hashing. If you understand how a hash table works, this is no different.

Producers rarely care which specific partition the records will map to, only that related records end up in the same partition, and that their order is preserved. Similarly, consumers are largely indifferent to their assigned partitions, so long that they receive the records in the same order as they were published, and their partition assignment does not overlap with any other consumer in their group.

Committing Offsets

We already said that consumers maintain an internal state with respect to their partition offsets. At some point, that state must be shared with Kafka, so that when a partition is reassigned, the new consumer can resume processing from where the outgoing consumer left off. Similarly, if the consumers were to disconnect, upon reconnection they would ideally skip over those records that have already been processed.

Persisting the consumer state back to the Kafka cluster is called committing an offset. Typically, a consumer will read a record (or a batch of records) and commit the offset of the last record, plus one. If a new consumer takes over the topic, it will commence processing from the last committed offset, hence the plus-one step is essential. (Otherwise, the last processed record would be handled a second time.)

Fun fact: Kafka employs a recursive approach to managing committed offsets, elegantly utilising itself to persist and track offsets. When an offset is committed, Kafka will publish a binary record on the internal __consumer_offsets topic. The contents of this topic are compacted in the background, creating an efficient event store that progressively reduces to only the last known commit points for any given consumer group.

Controlling the point when an offset is committed provides a great deal of flexibility around delivery guarantees, handing Kafka a yet another trump card. The term ‘delivery’ assumes not just reading a record, but the full processing cycle, complete with any side-effects. One can shift from an at-most-once to an at-least-once delivery model by simply moving the commit operation from a point before the processing of a record is commenced, to a point sometime after the processing is complete. With this model, should the consumer fail midway through processing a record, the record will be re-read the following partition reassignment.

By default, a Kafka consumer will automatically commit offsets every five seconds, regardless of whether the consumer has finished processing the record. Often, this is not what you want, as it may lead to mixed delivery semantics. For example, in the event of consumer failure, some records might be delivered twice, while others might not be delivered at all. To enable manual offset committing, set the enable.auto.commit property to false.

Note: There are a few gotchas like this in Kafka. Pay close attention to the (producer and consumer) client properties in the official Kafka documentation, particularly to the stated defaults. Don’t assume for a moment that the defaults are sensible, insofar as they ought to favour safety over other competing qualities. Kafka defaults tend to be optimised for performance, and will need to be explicitly overridden on the client when safety is a critical objective. Fortunately, setting the properties to insure safety has only a minor impact on performance — Kafka is still a beast. Remember the first rule of optimisation: Don’t do it. Kafka would have been even better, had their creators given this more thought.

Getting offset committing right can be tricky, and routinely catches out beginners. A committed offset implies that the record one below that offset and all prior records have been dealt with by the consumer. When designing at-least-once or exactly-once applications, an offset should only be committed when the application is dealt with with the record in question and all records before it.

In other words, the record has been processed to the point that any actions that would have resulted from the record have been carried out and finalized. This may include calling other APIs, updating a database, committing transactions, persisting the record’s payload, or publishing more records. Stated otherwise, if the consumer were to fail after committing the record, then not ever seeing this record again must not be detrimental to its correctness.

In the at-least-once (and by extension, the exactly-once) scenario, a typical consumer implementation will commit its offset linearly, in tandem with the processing of the records. That is, read a record, commit it (plus-one), read the next, commit it (plus one), and so on. A common tactic is to process a batch of records concurrently (where this makes sense), using a thread pool, and only confirm the last record when the entire batch is done. The commit process in Kafka is very efficient, the client library will send commit requests asynchronously to the cluster using an in-memory queue, without blocking the consumer. The client application can register an optional callback, notifying it when the commit has been acknowledged by the cluster.

The consumer group is a somewhat understated concept that is pivotal to the versatility of an event streaming platform. By simply varying the affinity of consumers with their groups, one can arrive at vastly different distribution topologies — from a topic-like, pub-sub behavior to an MQ-style, point-to-point model. Because records are never truly consumed (the advancing offset only creates the illusion of consumption), one can concurrently superimpose disparate distribution topologies over a single event stream.

Free Consumers

Consumer groups are completely optional; a consumer does not need to be encompassed in a consumer group to pull messages from a topic. A free consumer omits the group.id property. Doing so allows it to operate under relaxed rules, entirely transferring the responsibility for consumer management to the application.

Note: The use of the term ‘free’ to denote a consumer without an encompassing group is not part of the standard Kafka nomenclature. As Kafka lacks a canonical term to describe this, the term ‘free’ was adopted here.

Free consumers do not subscribe to a topic. Instead, the consuming application is responsible for manually assigning a set of topic-partitions to the consumer, individually specifying the starting offset for each topic-partition pair. Free consumers do not commit their offsets to Kafka; it is up to the application to track the progress of such consumers and persist their state as appropriate, using a datastore of their choosing. The concepts of automatic partition assignment, rebalancing, offset persistence, partition exclusivity, consumer heart-beating and failure detection, and other so-called niceties accorded to consumer groups cease to exist in this mode.

Free consumers are not observed in the wild as often as their grouped counterparts. There are predominantly two use cases where a free consumer is an appropriate choice. The first, is when you genuinely need full control of the partition assignment scheme and/or you require an alternative place to store consumer offsets. This is very rare.

Needless to say, it’s also very difficult to implement correctly, given the multitude of scenarios one must account for. The second, more commonly seen use case, is when you have a stateless or ephemeral consumer that needs to monitor a topic. For example, you might be interested in tailing a topic to identify specific records, or just as a debugging tool. You might only care about records that were published when your stateless consumer was online, so concerns such as persisting offsets and resuming from the last processed record are completely irrelevant.

A good example of where this is used routinely is the Kafdrop web UI, which we’ve already seen. When you click on a topic to view the messages, Kafdrop creates a free consumer and assigns the requested partition to it, reading the records from the supplied offsets. Navigating to a different topic or partition will reset the consumer, discarding any prior state.

The illustration below outlines the relationship between producers, topics, partitions, consumers, and consumer groups.1

+----------+          +----------+

2

|PRODUCER 1|          |PRODUCER 2|

3

+-----v----+          +-----v----+

4

      |                     |

5

      |                     |

6

      |                     |

7

 +----V---------------------V-----------------------------------------+

8

 |                            >>> TOPIC >>>                           |

9

 |            +---------------------------------------------------+   |

10

 | PARTITION 0|record 0..00|record 0..01|record 0..02|record 0..03|   |

11

 |            +--------------------v------------------------------+   |

12

 |                                 |                                  |

13

 |            +--------------------|------------------------------+   |

14

 | PARTITION 1|record 0..00|       |    |record 0..02|record 0..03|   |

15

 |            +--------------------|-------------v----------------+   |

16

 |                                 |             |                    |

17

 +----------v----------------------|-------------|--------------------+

18

            |                      |             |       

19

            |                      |             | 

20

            |                      |             | 

21

            |              +-------|-------------|----------------------+

22

            |              |       |             |                      |

23

       +----V-----+        | +-----V----+   +----V-----+   +----------+ |

24

       |CONSUMER 1|        | |CONSUMER 2|   |CONSUMER 3|   |CONSUMER 4| |

25

       +----------+        | +----------+   +----------+   +----------+ |

26

                           |               CONSUMER GROUP               |

27

                           +--------------------------------------------+

28

29

The key takeaways are:

  • Topics are subdivided into partitions, each forming an independent, totally-ordered sequence within a wider, partially-ordered stream.
  • Multiple producers are able to publish to a topic, picking a partition at will. This may be accomplished either directly, by specifying a partition index, or indirectly, by way of a record key, which deterministically hashes to a consistent partition index. (In the diagram above, both Producer 1 and Producer 2 publish to the same topic.)
  • Partitions in a topic can be load-balanced across a population of consumers in a consumer group, allocating partitions approximately evenly among the members of that group. (Consumer 2 and Consumer 3 each get one partition.)
  • A consumer in a group is not guaranteed a partition assignment. Where the group’s population outnumbers the partitions, some consumers will remain idle until this balance equalizes or tips in favor of the other side. (Consumer 4 remains partition-less.)
  • Partitions may be manually assigned to free consumers. If necessary, an entire topic may be assigned to a single free consumer — this is done by individually assigning all partitions. (Consumer 1 can be freely assigned any partition.)

Exactly-Once Delivery

When contrasting at-least-once with at-most-once delivery semantics, an often-asked question is: Why can’t we have it exactly once?

Without delving into the academic details, which involve conjectures and impossibility proofs, it is sufficient to say that exactly-once semantics are not possible without collaboration with the consumer application. What does this mean in practice?

Consumers in event streaming applications must be idempotent. In other words, processing the same record repeatedly should have no net effect on the consumer ecosystem. If a record has no additive effects, the consumer is inherently idempotent. (For example, if the consumer simply overwrites an existing database entry with a new one, then the update is naturally idempotent.) Otherwise, the consumer must check whether a record has already been processed, and to what extent, prior to processing a record. The combination of at-least-once delivery and consumer idempotence collectively leads to exactly-once semantics.

Example: A Trading Platform

With all this theory looming over us like Kubrick’s Monolith, it would be inappropriate to conclude without offering the reader a practical scenario.

Let’s say you were looking for specific price patterns in listed stocks, emitting trading signals when a particular pattern is identified. There are a large number of stocks, and understandably you’d like them processed in parallel. However, the time series for any given ticker code must be processed sequentially on a single consumer.

Kafka makes this use case, and others like it, almost trivial to implement. We would create a pair of topics: prices for the raw price data, and orders for any resulting orders. We can be fairly generous with our partition counts, as the nature of the data gives us ample opportunities for parallelism.

At the feed source, we could publish a record for each price on the prices topic, keyed by the ticker code. Kafka’s automatic partition assignment will ensure that every ticker code is handled by (at most) one consumer in its group. The consumer instances are free to scale in and out to match the processing load. Consumer groups should be meaningfully named, ideally reflecting the purpose of the consuming application. A good example might be trading-strategy.abc, for a fictitious trading strategy named ‘ABC’.

Once a price pattern is identified by the consumer, it can publish another message — the order request — on the orders topic. We’ll muster up another consumer group — order-execution — responsible for reading the orders and forwarding them to the broker.

In this simple example, we have created an end-to-end trading pipeline that is entirely event-driven and highly scalable — at least theoretically, assuming there are no other bottlenecks. We can dynamically add more processing nodes to the individual stages to cope with the increased load where it’s called for.

Now, let’s spice things up a bit. Suppose you need several trading strategies operating concurrently, driven by a common data feed. Furthermore, the trading strategies will be developed by different teams; the objective being to decouple these implementations as much as possible, allowing the teams to operate autonomously — develop and deploy at their individual cadence, perhaps even using different programming languages and tool-chains. That said, you’d ideally want to reuse as much of what’s already been written. So, how would we pull this off? 

Trading Platform

Kafka’s flexible multipoint-to-multipoint pub-sub architecture combines stateful consumption with broadcast semantics. Using distinct consumer groups, Kafka allows disparate applications to share input topics, processing events at their own pace. The second trading strategy would need a dedicated consumer group — trading-strategy.xyz — applying its specific business logic to the common pricing stream, publishing the resulting orders to the same orders topic. In this fashion, Kafka enables you to construct modular event processing pipelines from discrete elements that are readily reusable and composable.

Note: In the days of service buses and traditional ‘enterprisey’ message brokers, before event sourcing entered the mainstream, you would have had to choose between persistent message queues or transient broadcast topics. In our example, you would likely have created multiple FIFO queues, using the fan-out pattern. Because Kafka generalises pub-sub topics and persistent message queues into a unified model, a single source topic can power a diverse range of consumers without incurring duplication.

In Conclusion

Event streaming platforms are a highly effective building block in the construction of modular, loosely-coupled, event-driven applications. Within the world of event streaming, Kafka has solidified its position as the go-to open-source solution that is both amazingly flexible and highly performant. Concurrency and parallelism are at the heart of Kafka’s architecture, forming partially-ordered event streams that can be load-balanced across a scalable consumer ecosystem. A simple reconfiguration of consumers and their encompassing groups can bring about vastly different event distribution and processing semantics; shifting the offset commit point can invert the delivery guarantee from an at-most-once to an at-least-once model.

Of course, Kafka isn’t without its flaws. The tooling is sub-par, to put it mildly; most Kafka practitioners have long abandoned the out-of-the-box CLI utilities in favour of other open-source tools such as KafdropKafkacat and third-party commercial offerings like Kafka Tool. The breadth of Kafka’s configuration options is overwhelming, with defaults that are riddled with gotchas, ready to shock the unsuspecting first-time user.

All in all, Kafka represents a paradigm shift in how we architect and build complex systems. Its benefits go beyond the superfluous, and they dwarf any of the niggles that are bound to exist in a technology that has undergone such aggressive adoption. Crucially, it paves the way for further progress in its space; Apache Pulsar is a prime example of an alternative platform that has improved on much of Kafka’s shortcomings, yet owes a great deal to its predecessor for laying the cornerstone and bringing the genre to the mainstream.

Building Restful APIs with Kotlin, Spring Boot, Mysql, JPA and Hibernate
05
Mar
2021

Building Restful APIs with Kotlin, Spring Boot, MySQL , JPA and Hibernate

Kotlin has gained a lot of popularity in recent times due to its productivity features and a first class support in Android.

Owing to the increasing popularity of Kotlin, Spring framework 5 has also introduced a dedicated support for Kotlin in Spring applications.

In this article, You’ll learn how to build a Restful CRUD API with Kotlin and Spring Boot 2.x, which is based on Spring framework 5.

So Stay tuned!

What will we build?

In this blog post, we’ll build Restful APIs for a mini blog application. The blog has a list of Articles. We’ll write APIs for creating, retrieving, updating and deleting an Article. An Article has an id, a title and some content.

We’ll use MySQL as our data source and JPA & Hibernate to access the data from the database.

All right, Let’s now create the application.

Creating the Application

We’ll use Spring initializr web tool to bootstrap our application. Follow the steps below to generate the application :

  1. Go to http://start.spring.io
  2. Select Kotlin in the language section.
  3. Enter Artifact as kotlin-demo
  4. Add WebJPA, and MySQL dependencies.
  5. Click Generate to generate and download the project.
Kotlin Spring Boot Restful CRUD API Example

Once the project is generated, unzip it and import it into your favorite IDE. Here is the Project’s directory Structure for your reference.

Kotlin Spring Boot Restful CRUD API Example Directory Structure

Configure MySQL

We’ll need to configure MySQL database url, username, and password so that Spring Boot can create a Data source.

Open src/main/resources/application.properties file and add the following properties to it –

## Spring DATASOURCE (DataSourceAutoConfiguration & DataSourceProperties)
spring.datasource.url = jdbc:mysql://localhost:3306/kotlin_demo_app?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false
spring.datasource.username = root
spring.datasource.password = root


## Hibernate Properties

# The SQL dialect makes Hibernate generate better SQL for the chosen database
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5InnoDBDialect

# Hibernate ddl auto (create, create-drop, validate, update)
spring.jpa.hibernate.ddl-auto = update

Please don’t forget to change spring.datasource.username and spring.datasource.password as per your MySQL installation.

Note that, I’ve set spring.jpa.hibernate.ddl-auto property to update. This property updates the database schema whenever you create or modify the domain models in your application.

Creating the Domain Model

Let’s now create the Article domain entity. Create a new package called model inside com.example.kotlindemo package, and then create a new Kotlin file called Article.kt with the following contents –

package com.example.kotlindemo.model

import javax.persistence.Entity
import javax.persistence.GeneratedValue
import javax.persistence.GenerationType
import javax.persistence.Id
import javax.validation.constraints.NotBlank

@Entity
data class Article (
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long = 0,

    @get: NotBlank
    val title: String = "",

    @get: NotBlank
    val content: String = ""
)

The Entity class is so small and concise, right? That’s because A Kotlin class doesn’t need getters and setters like Java. Moreover, I have used a data class here. A data class automatically generates equals()hashcode()toString() and copy() methods.

Note that, I’ve assigned a default value for all the fields in the Article class. This is needed because Hibernate requires an entity to have a no-arg constructor.

Assigning default values to all the member fields will let hibernate instantiate an Article without passing any argument. It Works because Kotlin supports Default Arguments 🙂

Creating the Repository

Let’s now create the repository for accessing the data from the database. First, create a package called repository inside com.example.kotlindemo package, and then create a Kotlin file named ArticleRepository.kt with the following contents –

package com.example.kotlindemo.repository

import com.example.kotlindemo.model.Article
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.stereotype.Repository

@Repository
interface ArticleRepository : JpaRepository<Article, Long>

That’s all we need to do here. Since we’ve extended ArticleRepository from JpaRepository interface, all the CRUD methods on Article entity is readily available to us. Spring boot automatically plugs-in a default implementation of JpaRepository called SimpleJpaRepository at runtime.

Creating the controller End-points

Finally, Let’s create the controller end-points for all the CRUD operations on Article entity.

First, create a new package called controller inside com.example.kotlindemo package and then create a new kotlin file called ArticleController.kt inside controller package with the following contents –

package com.example.kotlindemo.controller

import com.example.kotlindemo.model.Article
import com.example.kotlindemo.repository.ArticleRepository
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
import java.util.*
import javax.validation.Valid

@RestController
@RequestMapping("/api")
class ArticleController(private val articleRepository: ArticleRepository) {

    @GetMapping("/articles")
    fun getAllArticles(): List<Article> =
            articleRepository.findAll()


    @PostMapping("/articles")
    fun createNewArticle(@Valid @RequestBody article: Article): Article =
            articleRepository.save(article)


    @GetMapping("/articles/{id}")
    fun getArticleById(@PathVariable(value = "id") articleId: Long): ResponseEntity<Article> {
        return articleRepository.findById(articleId).map { article -> 
            ResponseEntity.ok(article)
        }.orElse(ResponseEntity.notFound().build())
    }

    @PutMapping("/articles/{id}")
    fun updateArticleById(@PathVariable(value = "id") articleId: Long,
                          @Valid @RequestBody newArticle: Article): ResponseEntity<Article> {

        return articleRepository.findById(articleId).map { existingArticle ->
            val updatedArticle: Article = existingArticle
                    .copy(title = newArticle.title, content = newArticle.content)
            ResponseEntity.ok().body(articleRepository.save(updatedArticle))
        }.orElse(ResponseEntity.notFound().build())

    }

    @DeleteMapping("/articles/{id}")
    fun deleteArticleById(@PathVariable(value = "id") articleId: Long): ResponseEntity<Void> {

        return articleRepository.findById(articleId).map { article  ->
            articleRepository.delete(article)
            ResponseEntity<Void>(HttpStatus.OK)
        }.orElse(ResponseEntity.notFound().build())

    }
}

The controller defines APIs for all the CRUD operations. I have used Kotlin’s functional style syntax in all the methods to make them short and concise.

Running the Application

You can run the application by typing the following command in the terminal –

mvn spring-boot:run

The application will start at Spring Boot’s default port 8080.

Exploring the Rest APIs

1. POST /api/articles – Create an Article

curl -i -H "Content-Type: application/json" -X POST \
-d '{"title": "How to learn Spring framework", "content": "Resources to learn Spring framework"}' \
http://localhost:8080/api/articles

# Output
HTTP/1.1 200 
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Fri, 06 Oct 2017 03:25:59 GMT

{"id":1,"title":"How to learn Spring framework","content":"Resources to learn Spring framework"}

2. GET /api/articles – Get all Articles

curl -i -H 'Accept: application/json' http://localhost:8080/api/articles

# Output
HTTP/1.1 200 
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Fri, 06 Oct 2017 03:25:29 GMT

[{"id":1,"title":"How to learn Spring framework","content":"Resources to learn Spring framework"}]

3. Get /api/articles/{id} – Get an Article by id

curl -i -H 'Accept: application/json' http://localhost:8080/api/articles/1

# Output
HTTP/1.1 200 
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Fri, 06 Oct 2017 03:27:51 GMT

{"id":1,"title":"How to learn Spring framework","content":"Resources to learn Spring framework"}

4. PUT /api/articles/{id} – Update an Article

curl -i -H "Content-Type: application/json" -X PUT \
-d '{"title": "Learning Spring Boot", "content": "Some resources to learn Spring Boot"}' \
http://localhost:8080/api/articles/1

# Output
HTTP/1.1 200 
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Fri, 06 Oct 2017 03:33:15 GMT

{"id":1,"title":"Learning Spring Boot","content":"Some resources to learn Spring Boot"}

5. DELETE /api/articles/{id} – Delete an Article

curl -i -X DELETE http://localhost:8080/api/articles/1

# Output
HTTP/1.1 200 
Content-Length: 0
Date: Fri, 06 Oct 2017 03:34:22 GMT

Conclusion

That’s all folks! In this article, You learned how to use Kotlin with Spring Boot for building restful web services.

You can find the entire code for the application that we built in this article in my github repository. Consider giving a star on github if you find the project useful.

Thank you for reading folks! See you next time 🙂