Интеграция с Apache Cassandra. Создаем микросервис с Cassandra и Kafka

Уровень сложностиСредний
Время на прочтение66 мин
Количество просмотров3.1K
Автор статьи: Рустем Галиев

IBM Senior DevOps Engineer & Integration Architect

Привет Хабр!

Сегодня мы узнаем, как написать микросервис, который сохраняет данные в Apache Cassandra с помощью Java-драйвера DataStax и генерирует события для изменений данных в Apache Kafka.

Этот пост основан на дизайне простого микросервиса для управления данными бронирования отелей, который называется Reservation Service. Вы можете выполнить серию упражнений по записи и чтению данных в Cassandra с помощью службы резервирования в наборе «Cassandra: разработка приложений с помощью Java-драйвера DataStax». Исходный код службы бронирования доступен на GitHub.

Служба бронирования использует модель данных бронирования отелей, которая включает приведенные ниже таблицы.

Cassandra и Kafka часто используются вместе в микросервисных архитектурах, как показано ниже. В этом дизайне служба бронирования получает запрос API для выполнения некоторых действий, таких как создание, обновление или удаление бронирования. После сохранения изменения в Cassandra служба резервирования отправляет сообщение в топик reservation в кластере Kafka.

Другие службы используют топик reservation и выполняют различные действия в ответ на каждое сообщение: например, служба инвентаризации обновляет таблицы инвентаризации, чтобы отметить даты как зарезервированные, или, возможно, служба электронной почты отправляет электронное письмо с благодарностью гостю за бронирование. В этом стиле взаимодействия Кассандра и Кафка не связаны напрямую, а используются взаимодополняющим образом.

Настройка службы бронирования

Начнем с клонирования исходного кода Reservation Service с GitHub и проверки ветки, которая станет отправной точкой:

git clone -b kafka https://github.com/jeffreyscarpenter/reservation-service

После того, как это будет завершено, мы должны увидеть директорию reservation_service.

Давайте рассмотрим часть содержимого в коде.

Во-первых, мы рассмотрим схему, которую будем использовать для таблиц резервирования по пути reservation-service/src/main/resources/reservation.cql:

 * Copyright (C) 2016-2020 Jeff Carpenter

/* This file contains a slightly modified version of the "reservation" keyspace and table definitions
 * for the example defined in Chapter 5 of Cassandra: The Definitive Guide, 2nd and 3nd Editions.
 * The changes are to facilitate development exercises.

    WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE reservation.reservations_by_hotel_date (
    hotel_id text,
    start_date date,
    end_date date,
    room_number smallint,
    confirm_number text,
    guest_id uuid,
    PRIMARY KEY ((hotel_id, start_date), room_number)

CREATE TABLE reservation.reservations_by_confirmation (
    confirm_number text PRIMARY KEY,
    hotel_id text,
    start_date date,
    end_date date,
    room_number smallint,
    guest_id uuid

 The following tables are provided for completeness with the book text, but they are not used in the current
 implementation of the Reservation Service

CREATE TABLE reservation.reservations_by_guest (
    guest_last_name text,
    hotel_id text,
    start_date date,
    end_date date,
    room_number smallint,
    confirm_number text,
    guest_id uuid,
    PRIMARY KEY ((guest_last_name), hotel_id)

CREATE TYPE reservation.address (
    street text,
    city text,
    state_or_province text,
    postal_code text,
    country text

CREATE TABLE reservation.guests (
    guest_id uuid PRIMARY KEY,
    first_name text,
    last_name text,
    title text,
    emails set<text>,
    phone_numbers list<text>,
    addresses map<text, frozen<address>>

Мы также хотим отметить зависимости для драйвера DataStax Java и клиентских библиотек Kafka по пути reservation-service/pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">


	<description>Demo service using Cassandra driver and Spring Boot</description>

		<relativePath />





		<!-- Create a RESTFul Service -->

		<!-- Document for REST Service -->

		<!-- Cassandra Driver -->


		<!-- Provides JSON serialization/deserialization for date/time types -->

		<!-- Tests -->
		<!-- Junit 5 -->
		<!-- Test against Docker Containers -->
		<!-- Add driver keys to spring-boot config file -->




В исходном коде есть несколько ключевых классов Java, которые мы могли бы также изучить:

Определение класса сущностей, используемое в HTTP API, которое представляет тип данных, которые мы сохраняем в Cassandra и публикуем в теме Kafka reservation-service/src/main/java/dev/cassandraguide/model/Reservation.java:

 * Copyright (C) 2017-2020 Jeff Carpenter
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package dev.cassandraguide.model;

import java.io.Serializable;
import java.time.LocalDate;
import java.util.UUID;

 * Entity working with Reservation on Cassandra.
 * @author Jeff Carpenter
public class Reservation implements Serializable {

    /** Serial. */
    private static final long serialVersionUID = -3392237616280919281L;
    /** Hotel identifier, as Text not UUID (for simplicity). */
    private String hotelId;
    /** Formated as YYYY-MM-DD in interfaces. */
    private LocalDate startDate;
    /** Formated as YYYY-MM-DD in interfaces. */
    private LocalDate endDate;
    /** Room number. */
    private short roomNumber;
    /** UUID. */
    private UUID guestId;
    /** Confirmation for this Reservation. */
    private String confirmationNumber;
     * Default constructor
    public Reservation() {
     * Default constructor
    public Reservation(ReservationRequest form) {
     * Default constructor
    public Reservation(ReservationRequest form, String confirmationNumber) {
        this.confirmationNumber = confirmationNumber;

     * Getter accessor for attribute 'hotelId'.
     * @return
     *       current value of 'hotelId'
    public String getHotelId() {
        return hotelId;

     * Setter accessor for attribute 'hotelId'.
     * @param hotelId
     *      new value for 'hotelId '
    public void setHotelId(String hotelId) {
        this.hotelId = hotelId;

     * Getter accessor for attribute 'startDate'.
     * @return
     *       current value of 'startDate'
    public LocalDate getStartDate() {
        return startDate;

     * Setter accessor for attribute 'startDate'.
     * @param startDate
     *      new value for 'startDate '
    public void setStartDate(LocalDate startDate) {
        this.startDate = startDate;

     * Getter accessor for attribute 'endDate'.
     * @return
     *       current value of 'endDate'
    public LocalDate getEndDate() {
        return endDate;

     * Setter accessor for attribute 'endDate'.
     * @param endDate
     *      new value for 'endDate '
    public void setEndDate(LocalDate endDate) {
        this.endDate = endDate;

     * Getter accessor for attribute 'roomNumber'.
     * @return
     *       current value of 'roomNumber'
    public short getRoomNumber() {
        return roomNumber;

     * Setter accessor for attribute 'roomNumber'.
     * @param roomNumber
     *      new value for 'roomNumber '
    public void setRoomNumber(short roomNumber) {
        this.roomNumber = roomNumber;

     * Getter accessor for attribute 'guestId'.
     * @return
     *       current value of 'guestId'
    public UUID getGuestId() {
        return guestId;

     * Setter accessor for attribute 'guestId'.
     * @param guestId
     *      new value for 'guestId '
    public void setGuestId(UUID guestId) {
        this.guestId = guestId;

     * Getter accessor for attribute 'confirmationNumber'.
     * @return
     *       current value of 'confirmationNumber'
    public String getConfirmationNumber() {
        return confirmationNumber;

     * Setter accessor for attribute 'confirmationNumber'.
     * @param confirmationNumber
     * 		new value for 'confirmationNumber '
    public void setConfirmationNumber(String confirmationNumber) {
        this.confirmationNumber = confirmationNumber;
    /** {@inheritDoc} */
    public String toString() {
        return "Confirmation Number = " + confirmationNumber +
                ", Hotel ID: " + getHotelId() +
                ", Start Date = " + getStartDate() +
                ", End Date = " + getEndDate() +
                ", Room Number = " + getRoomNumber() +
                ", Guest ID = " + getGuestId();

Логика хранения данных сервиса, где мы будем выполнять большую часть нашей работы reservation-service/src/main/java/dev/cassandraguide/repository/ReservationRepository.java:

 * Copyright (C) 2017-2020 Jeff Carpenter
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package dev.cassandraguide.repository;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;

import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.PreDestroy;

import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;

// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

 * The goal of this project is to provide a minimally functional implementation of a microservice 
 * that uses Apache Cassandra for its data storage. The reservation service is implemented as a 
 * RESTful service using Spring Boot.
 * @author Jeff Carpenter, Cedrick Lunven
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {

    /** Logger for the class. */
    private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
    // Reservation Schema Constants
    public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");
    public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
    public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
    public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
    public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");
    public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");
    public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");
    public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");
    public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");
    public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");
    public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");
    public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");
    public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");
    public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");
    public static final CqlIdentifier CONFIRM_NUMBER             = CqlIdentifier.fromCql("confirm_number");
    public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");
    public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");
    public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");
    public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");
    public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");
    public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");
    public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");
    public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");
    private PreparedStatement psExistReservation;
    private PreparedStatement psFindReservation;
    private PreparedStatement psInsertReservationByHotelDate;
    private PreparedStatement psInsertReservationByConfirmation;
    private PreparedStatement psDeleteReservationByHotelDate;
    private PreparedStatement psDeleteReservationByConfirmation;
    private PreparedStatement psSearchReservation;
    /** CqlSession holding metadata to interact with Cassandra. */
    private CqlSession     cqlSession;
    private CqlIdentifier  keyspaceName;

    // TODO: Review variables used for publishing to Kafka
    /** KafkaProducer for publishing messages to Kafka. */
    private KafkaProducer<String, String> kafkaProducer;
    private String kafkaTopicName;
    private ObjectMapper objectMapper;
    /** External Initialization. */
    public ReservationRepository(
            @NonNull CqlSession cqlSession, 
            @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
            @NonNull KafkaProducer<String, String> kafkaProducer,
            @NonNull String kafkaTopicName) {
        this.cqlSession   = cqlSession;
        this.keyspaceName = keyspaceName;

        // TODO: Review initialization of objects needed for publishing to Kafka
        this.kafkaProducer = kafkaProducer;
        this.kafkaTopicName = kafkaTopicName;

        objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());

        // Will create tables (if they do not exist)
        // Prepare Statements of reservation
        logger.info("Application initialized.");
     * CqlSession is a stateful object handling TCP connections to nodes in the cluster.
     * This operation properly closes sockets when the application is stopped
    public void cleanup() {
        if (null != cqlSession) {
            logger.info("+ CqlSession has been successfully closed");
     * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
     * table where confirmation number is partition key which is reservations_by_confirmation
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      true if the reservation exists, false if it does not
    public boolean exists(String confirmationNumber) {
        return cqlSession.execute(psExistReservation.bind(confirmationNumber))
                         .getAvailableWithoutFetching() > 0;
     * Similar to exists() but maps and parses results.
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      reservation if present or empty
    public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
        ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
        // Hint: an empty result might not be an error as this method is sometimes used to check whether a
        // reservation with this confirmation number exists
        Row row = resultSet.one();
        if (row == null) {
            logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
            return Optional.empty();
        // Hint: If there is a result, create a new reservation object and set the values
        // Bonus: factor the logic to extract a reservation from a row into a separate method
        // (you will reuse it again later in getAllReservations())
        return Optional.of(mapRowToReservation(row));
     * Create new entry in multiple tables for this reservation.
     * @param reservation
     *      current reservation object
     * @return
     *      confirmation number for the reservation
     public String upsert(Reservation reservation) {
        if (null == reservation.getConfirmationNumber()) {
            // Generating a new reservation number if none has been provided
        // Insert into 'reservations_by_hotel_date'
        BoundStatement bsInsertReservationByHotel = 
                psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
                        reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
        // Insert into 'reservations_by_confirmation'
        BoundStatement bsInsertReservationByConfirmation = 
                psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
                        reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
        BatchStatement batchInsertReservation = BatchStatement

        // TODO: Publish message to Kafka containing reservation
         try {
             String reservationJson = objectMapper.writeValueAsString(reservation);
             // HINT: use the constructor ProducerRecord(String topic, K key, V value)
             // with the reservation confirmation number as the key, and the JSON string as the value
             ProducerRecord<String, String> record = null;
         } catch (Exception e) {
             logger.warn("Error publishing reservation message to Kafka: {}", e);

        return reservation.getConfirmationNumber();

     * We pick 'reservations_by_confirmation' table to list reservations
     * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
     * @return
     *      list containing all reservations
    public List<Reservation> findAll() {
        return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
                  .all()                          // no paging we retrieve all objects
                  .stream()                       // because we are good people
                  .map(this::mapRowToReservation) // Mapping row as Reservation
                  .collect(Collectors.toList());  // Back to list objects
     * Deleting a reservation.
     * @param confirmationNumber
     *      unique identifier for confirmation.
    public boolean delete(String confirmationNumber) {

        // Retrieving entire reservation in order to obtain the attributes we will need to delete from
        // reservations_by_hotel_date table
        Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);

        if (reservationToDelete.isPresent()) {

            // Delete from 'reservations_by_hotel_date'
            Reservation reservation = reservationToDelete.get();
            BoundStatement bsDeleteReservationByHotelDate =
                            reservation.getStartDate(), reservation.getRoomNumber());

            // Delete from 'reservations_by_confirmation'
            BoundStatement bsDeleteReservationByConfirmation =

            BatchStatement batchDeleteReservation = BatchStatement

            // TODO: Publish message to Kafka with empty payload to indicate deletion
            // HINT: use the constructor ProducerRecord(String topic, K key, V value)
            // with the reservation confirmation number as the key, and an empty string as the value
            ProducerRecord<String, String> record = null;

            return true;
        return false;
     * Search all reservation for an hotel id and LocalDate.
     * @param hotelId
     *      hotel identifier
     * @param date
     *      searched Date
     * @return
     *      list of reservations matching the search criteria
    public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
        return cqlSession.execute(psSearchReservation.bind(hotelId, date))
                         .all()                          // no paging we retrieve all objects
                         .stream()                       // because we are good people
                         .map(this::mapRowToReservation) // Mapping row as Reservation
                         .collect(Collectors.toList());  // Back to list objects

     * Utility method to marshal a row as expected Reservation Bean.
     * @param row
     *      current row from ResultSet
     * @return
     *      object
    private Reservation mapRowToReservation(Row row) {
        Reservation reservation = new Reservation();
        return reservation;
     * Create Keyspace and relevant tables as per defined in 'reservation.cql'
    public void createReservationTables() {
         * Create TYPE 'Address' if not exists
         * CREATE TYPE reservation.address (
         *   street text,
         *   city text,
         *   state_or_province text,
         *   postal_code text,
         *   country text
         * );
                createType(keyspaceName, TYPE_ADDRESS)
                .withField(STREET, DataTypes.TEXT)
                .withField(CITY, DataTypes.TEXT)
                .withField(STATE_PROVINCE, DataTypes.TEXT)
                .withField(POSTAL_CODE, DataTypes.TEXT)
                .withField(COUNTRY, DataTypes.TEXT)
        logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
         * CREATE TABLE reservation.reservations_by_hotel_date (
         *  hotel_id text,
         *  start_date date,
         *  end_date date,
         *  room_number smallint,
         *  confirm_number text,
         *  guest_id uuid,
         *  PRIMARY KEY ((hotel_id, start_date), room_number)
         * );
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                        .withPartitionKey(HOTEL_ID, DataTypes.TEXT)
                        .withPartitionKey(START_DATE, DataTypes.DATE)
                        .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                        .withColumn(END_DATE, DataTypes.DATE)
                        .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                        .withColumn(GUEST_ID, DataTypes.UUID)
                        .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
                        .withComment("Q7. Find reservations by hotel and date")
        logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
         * CREATE TABLE reservation.reservations_by_confirmation (
         *   confirm_number text PRIMARY KEY,
         *   hotel_id text,
         *   start_date date,
         *   end_date date,
         *   room_number smallint,
         *   guest_id uuid
         * );
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)
                .withColumn(HOTEL_ID, DataTypes.TEXT)
                .withColumn(START_DATE, DataTypes.DATE)
                .withColumn(END_DATE, DataTypes.DATE)
                .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                .withColumn(GUEST_ID, DataTypes.UUID)
         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
          * CREATE TABLE reservation.reservations_by_guest (
          *  guest_last_name text,
          *  hotel_id text,
          *  start_date date,
          *  end_date date,
          *  room_number smallint,
          *  confirm_number text,
          *  guest_id uuid,
          *  PRIMARY KEY ((guest_last_name), hotel_id)
          * );
         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
                 .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
                 .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
                 .withColumn(START_DATE, DataTypes.DATE)
                 .withColumn(END_DATE, DataTypes.DATE)
                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                 .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                 .withColumn(GUEST_ID, DataTypes.UUID)
                 .withComment("Q8. Find reservations by guest name")
          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
           * CREATE TABLE reservation.guests (
           *   guest_id uuid PRIMARY KEY,
           *   first_name text,
           *   last_name text,
           *   title text,
           *   emails set<text>,
           *   phone_numbers list<text>,
           *   addresses map<text, frozen<address>>,
           *   confirm_number text
           * );
          UserDefinedType  udtAddressType = 
                  cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
                            .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)
          cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
                  .withPartitionKey(GUEST_ID, DataTypes.UUID)
                  .withColumn(FIRSTNAME, DataTypes.TEXT)
                  .withColumn(LASTNAME, DataTypes.TEXT)
                  .withColumn(TITLE, DataTypes.TEXT)
                  .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
                  .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
                  .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
                  .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                  .withComment("Q9. Find guest by ID")
           logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
           logger.info("Schema has been successfully initialized.");

    private void prepareStatements() {
        if (psExistReservation == null) {
            psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)
            psFindReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
            psSearchReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
            psDeleteReservationByConfirmation = cqlSession.prepare(
                                deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
            psDeleteReservationByHotelDate = cqlSession.prepare(
                    deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
            psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
            psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
            logger.info("Statements have been successfully prepared.");

Код, который создает подключение к Kafka reservation-service/src/main/java/dev/cassandraguide/conf/KafkaConfiguration.java:

package dev.cassandraguide.conf;

import dev.cassandraguide.repository.ReservationRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

 * Import Configuration from Configuration File
 * @author Jeff Carpenter
public class KafkaConfiguration {

    // Logger
    private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);

    // Bootstrap servers
    protected String bootstrapServers;

    // Kafka Client ID
    protected String clientId = "ReservationService";

    // Topic Name
    public String topicName = "reservation";

     * Default configuration.
    public KafkaConfiguration() {}

     * Initialization of Configuration.
     * @param bootstrapServers
     * @param clientId
     * @param topicName
    public KafkaConfiguration(
            String bootstrapServers, String clientId, String topicName) {
        this.bootstrapServers = bootstrapServers;
        this.clientId = clientId;
        this.topicName = topicName;

    public KafkaProducer<String, String> kafkaProducer() {
        logger.info("Creating Kafka Producer.");

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);

     * Getter accessor for attribute 'bootstrapServers'.
     * @return
     *       current value of 'bootstrapServers'
    public String getBootstrapServers() {
        return bootstrapServers;

     * Setter accessor for attribute 'bootstrapServers'.
     * @param bootstrapServers
     * 		new value for 'bootstrapServers'
    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;

     * Getter accessor for attribute 'clientId'.
     * @return
     *       current value of 'clientId'
    public String getClientId() {
        return clientId;

     * Setter accessor for attribute 'clientId'.
     * @param clientId
     * 		new value for 'clientId '
    public void setClientId(String clientId) {
        this.clientId = clientId;

     * Getter accessor for attribute 'topicName'.
     * @return
     *       current value of 'topicName'
    public String getTopicName() {
        return topicName;

     * Setter accessor for attribute 'topicName'.
     * @param topicName
     * 		new value for 'topicName '
    public void setTopicName(String topicName) {
        this.topicName = topicName;


Обратите внимание, что служба резервирования использует файл конфигурации для хранения информации о конфигурации Cassandra и Kafka, которую мы можем просмотреть здесь reservation-service/src/main/resources/application.yml:

# ----------------------------------------------------------
# Spring Boot Config
# ----------------------------------------------------------
    name: Reservation Reservicess
  port: 8080

# ----------------------------------------------------------
# DataStax Enterprise Java Driver Config
# ----------------------------------------------------------
  port: 9042
  keyspaceName: reservation
  localDataCenterName: datacenter1
  dropSchema: false

# ----------------------------------------------------------
# Kafka Producer Config
# ----------------------------------------------------------
  bootstrap-servers: localhost:9092
  client-id: ReservationService
  topicName: reservation

Таблицы Cassandra для хранения бронирований

В дистрибутив Cassandra входит cqlsh, оболочка для выдачи команд на CQL. Запустим оболочку:


Мы должны увидеть приглашение cqlsh> в терминале.

Как только подсказка будет доступна, мы можем загрузить файл схемы, который мы просмотрели ранее, чтобы создать пространство ключей, содержащее таблицы для резервирования:

SOURCE 'reservation-service/src/main/resources/reservation.cql';

Теперь мы можем просмотреть только что созданную схему с помощью команды DESCRIBE:


Давайте установим это как кейспейс для будущих команд:

USE reservation;

Обратите внимание, что существуют три разные таблицы для хранения данных резервирования с похожими столбцами, но с разными первичными ключами.

Наша естественная тенденция как специалистов по моделированию данных заключалась бы в том, чтобы сначала сосредоточиться на разработке таблиц для хранения бронирований и других записей, таких как профили гостей, и только потом начинать думать о запросах, которые будут получать к ним доступ. Но в моделировании данных Cassandra мы начинаем с наших запросов. Эти таблицы предназначены для поддержки запросов, которые определяют, как ваши пользователи будут получать доступ к бронированиям, что приводит к денормализованному дизайну.

Во-первых, таблица reservations_by_confirmation поддерживает поиск бронирований по уникальному номеру подтверждения, предоставленному клиенту во время бронирования:

CREATE TABLE reservations_by_confirmation 
confirm_number text, 
hotel_id text, 
start_date date, 
end_date date, 
room_number smallint, 
guest_id uuid, 
PRIMARY KEY (confirm_number) )

Вот пример загрузки строки данных в эту таблицу:

INSERT INTO reservations_by_confirmation (confirm_number, hotel_id, start_date, end_date, room_number, guest_id) VALUES ('RS2G0Z', 'NY456', '2020-06-08', '2020-06-10', 111, 1b4d86f4-ccff-4256-a63d-45c905df2677);

Чтобы получить эту строку позже, мы использовали бы такой запрос:

SELECT * FROM reservations_by_confirmation WHERE confirm_number = 'RS2G0Z';

Во-вторых, таблица reservations_by_hotel_date позволяет персоналу отеля просматривать записи предстоящих бронирований по датам, чтобы получить представление о том, как работает отель, например, когда отель был заполнен или не заполнен:

CREATE TABLE reservations_by_hotel_date ( hotel_id text, start_date date, end_date date, room_number smallint, confirm_number text, guest_id uuid, PRIMARY KEY ((hotel_id, start_date), room_number) );

Вот пример загрузки той же брони, которую мы только что загрузили в reservations_by_confirmation, в эту дополнительную таблицу:

INSERT INTO reservations_by_hotel_date (confirm_number, hotel_id, start_date, end_date, room_number, guest_id) VALUES ('RS2G0Z', 'NY456', '2020-06-08', '2020-06-10', 111, 1b4d86f4-ccff-4256-a63d-45c905df2677);

Эта таблица может поддерживать два запроса. Во-первых, поскольку ключ раздела содержит hotel_id и start_date, мы можем получить все бронирования для определенного отеля и даты:

SELECT * FROM reservations_by_hotel_date WHERE hotel_id = 'NY456' AND start_date = '2020-06-08';

Также мы можем найти бронь на конкретный номер по дате:

SELECT * FROM reservations_by_hotel_date WHERE hotel_id = 'NY456' AND start_date = '2020-06-08' AND room_number = 111;

Точно так же таблицу reservations_by_guest можно использовать для поиска бронирования по имени гостя для гостя, который забыл свой номер подтверждения. Для целей мы сосредоточимся на таблицах reservations_by_confirmation и reservations_by_hotel_date.

Мы заметим, что класс ReservationRepository использует инструкции Cassandra BATCH для группировки изменений в таблицах reservations_by_confirmation и reservations_by_hotel_date, чтобы они выполнялись как одна операция CQL reservation-service/src/main/java/dev/cassandraguide/repository/ReservationRepository.java:

 * Copyright (C) 2017-2020 Jeff Carpenter
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package dev.cassandraguide.repository;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;

import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.PreDestroy;

import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;

// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

 * The goal of this project is to provide a minimally functional implementation of a microservice 
 * that uses Apache Cassandra for its data storage. The reservation service is implemented as a 
 * RESTful service using Spring Boot.
 * @author Jeff Carpenter, Cedrick Lunven
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {

    /** Logger for the class. */
    private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
    // Reservation Schema Constants
    public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");
    public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
    public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
    public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
    public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");
    public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");
    public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");
    public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");
    public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");
    public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");
    public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");
    public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");
    public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");
    public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");
    public static final CqlIdentifier CONFIRM_NUMBER             = CqlIdentifier.fromCql("confirm_number");
    public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");
    public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");
    public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");
    public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");
    public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");
    public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");
    public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");
    public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");
    private PreparedStatement psExistReservation;
    private PreparedStatement psFindReservation;
    private PreparedStatement psInsertReservationByHotelDate;
    private PreparedStatement psInsertReservationByConfirmation;
    private PreparedStatement psDeleteReservationByHotelDate;
    private PreparedStatement psDeleteReservationByConfirmation;
    private PreparedStatement psSearchReservation;
    /** CqlSession holding metadata to interact with Cassandra. */
    private CqlSession     cqlSession;
    private CqlIdentifier  keyspaceName;

    // TODO: Review variables used for publishing to Kafka
    /** KafkaProducer for publishing messages to Kafka. */
    private KafkaProducer<String, String> kafkaProducer;
    private String kafkaTopicName;
    private ObjectMapper objectMapper;
    /** External Initialization. */
    public ReservationRepository(
            @NonNull CqlSession cqlSession, 
            @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
            @NonNull KafkaProducer<String, String> kafkaProducer,
            @NonNull String kafkaTopicName) {
        this.cqlSession   = cqlSession;
        this.keyspaceName = keyspaceName;

        // TODO: Review initialization of objects needed for publishing to Kafka
        this.kafkaProducer = kafkaProducer;
        this.kafkaTopicName = kafkaTopicName;

        objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());

        // Will create tables (if they do not exist)
        // Prepare Statements of reservation
        logger.info("Application initialized.");
     * CqlSession is a stateful object handling TCP connections to nodes in the cluster.
     * This operation properly closes sockets when the application is stopped
    public void cleanup() {
        if (null != cqlSession) {
            logger.info("+ CqlSession has been successfully closed");
     * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
     * table where confirmation number is partition key which is reservations_by_confirmation
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      true if the reservation exists, false if it does not
    public boolean exists(String confirmationNumber) {
        return cqlSession.execute(psExistReservation.bind(confirmationNumber))
                         .getAvailableWithoutFetching() > 0;
     * Similar to exists() but maps and parses results.
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      reservation if present or empty
    public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
        ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
        // Hint: an empty result might not be an error as this method is sometimes used to check whether a
        // reservation with this confirmation number exists
        Row row = resultSet.one();
        if (row == null) {
            logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
            return Optional.empty();
        // Hint: If there is a result, create a new reservation object and set the values
        // Bonus: factor the logic to extract a reservation from a row into a separate method
        // (you will reuse it again later in getAllReservations())
        return Optional.of(mapRowToReservation(row));
     * Create new entry in multiple tables for this reservation.
     * @param reservation
     *      current reservation object
     * @return
     *      confirmation number for the reservation
     public String upsert(Reservation reservation) {
        if (null == reservation.getConfirmationNumber()) {
            // Generating a new reservation number if none has been provided
        // Insert into 'reservations_by_hotel_date'
        BoundStatement bsInsertReservationByHotel = 
                psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
                        reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
        // Insert into 'reservations_by_confirmation'
        BoundStatement bsInsertReservationByConfirmation = 
                psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
                        reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
        BatchStatement batchInsertReservation = BatchStatement

        // TODO: Publish message to Kafka containing reservation
         try {
             String reservationJson = objectMapper.writeValueAsString(reservation);
             // HINT: use the constructor ProducerRecord(String topic, K key, V value)
             // with the reservation confirmation number as the key, and the JSON string as the value
             ProducerRecord<String, String> record = null;
         } catch (Exception e) {
             logger.warn("Error publishing reservation message to Kafka: {}", e);

        return reservation.getConfirmationNumber();

     * We pick 'reservations_by_confirmation' table to list reservations
     * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
     * @return
     *      list containing all reservations
    public List<Reservation> findAll() {
        return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
                  .all()                          // no paging we retrieve all objects
                  .stream()                       // because we are good people
                  .map(this::mapRowToReservation) // Mapping row as Reservation
                  .collect(Collectors.toList());  // Back to list objects
     * Deleting a reservation.
     * @param confirmationNumber
     *      unique identifier for confirmation.
    public boolean delete(String confirmationNumber) {

        // Retrieving entire reservation in order to obtain the attributes we will need to delete from
        // reservations_by_hotel_date table
        Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);

        if (reservationToDelete.isPresent()) {

            // Delete from 'reservations_by_hotel_date'
            Reservation reservation = reservationToDelete.get();
            BoundStatement bsDeleteReservationByHotelDate =
                            reservation.getStartDate(), reservation.getRoomNumber());

            // Delete from 'reservations_by_confirmation'
            BoundStatement bsDeleteReservationByConfirmation =

            BatchStatement batchDeleteReservation = BatchStatement

            // TODO: Publish message to Kafka with empty payload to indicate deletion
            // HINT: use the constructor ProducerRecord(String topic, K key, V value)
            // with the reservation confirmation number as the key, and an empty string as the value
            ProducerRecord<String, String> record = null;

            return true;
        return false;
     * Search all reservation for an hotel id and LocalDate.
     * @param hotelId
     *      hotel identifier
     * @param date
     *      searched Date
     * @return
     *      list of reservations matching the search criteria
    public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
        return cqlSession.execute(psSearchReservation.bind(hotelId, date))
                         .all()                          // no paging we retrieve all objects
                         .stream()                       // because we are good people
                         .map(this::mapRowToReservation) // Mapping row as Reservation
                         .collect(Collectors.toList());  // Back to list objects

     * Utility method to marshal a row as expected Reservation Bean.
     * @param row
     *      current row from ResultSet
     * @return
     *      object
    private Reservation mapRowToReservation(Row row) {
        Reservation reservation = new Reservation();
        return reservation;
     * Create Keyspace and relevant tables as per defined in 'reservation.cql'
    public void createReservationTables() {
         * Create TYPE 'Address' if not exists
         * CREATE TYPE reservation.address (
         *   street text,
         *   city text,
         *   state_or_province text,
         *   postal_code text,
         *   country text
         * );
                createType(keyspaceName, TYPE_ADDRESS)
                .withField(STREET, DataTypes.TEXT)
                .withField(CITY, DataTypes.TEXT)
                .withField(STATE_PROVINCE, DataTypes.TEXT)
                .withField(POSTAL_CODE, DataTypes.TEXT)
                .withField(COUNTRY, DataTypes.TEXT)
        logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
         * CREATE TABLE reservation.reservations_by_hotel_date (
         *  hotel_id text,
         *  start_date date,
         *  end_date date,
         *  room_number smallint,
         *  confirm_number text,
         *  guest_id uuid,
         *  PRIMARY KEY ((hotel_id, start_date), room_number)
         * );
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                        .withPartitionKey(HOTEL_ID, DataTypes.TEXT)
                        .withPartitionKey(START_DATE, DataTypes.DATE)
                        .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                        .withColumn(END_DATE, DataTypes.DATE)
                        .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                        .withColumn(GUEST_ID, DataTypes.UUID)
                        .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
                        .withComment("Q7. Find reservations by hotel and date")
        logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
         * CREATE TABLE reservation.reservations_by_confirmation (
         *   confirm_number text PRIMARY KEY,
         *   hotel_id text,
         *   start_date date,
         *   end_date date,
         *   room_number smallint,
         *   guest_id uuid
         * );
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)
                .withColumn(HOTEL_ID, DataTypes.TEXT)
                .withColumn(START_DATE, DataTypes.DATE)
                .withColumn(END_DATE, DataTypes.DATE)
                .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                .withColumn(GUEST_ID, DataTypes.UUID)
         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
          * CREATE TABLE reservation.reservations_by_guest (
          *  guest_last_name text,
          *  hotel_id text,
          *  start_date date,
          *  end_date date,
          *  room_number smallint,
          *  confirm_number text,
          *  guest_id uuid,
          *  PRIMARY KEY ((guest_last_name), hotel_id)
          * );
         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
                 .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
                 .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
                 .withColumn(START_DATE, DataTypes.DATE)
                 .withColumn(END_DATE, DataTypes.DATE)
                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                 .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                 .withColumn(GUEST_ID, DataTypes.UUID)
                 .withComment("Q8. Find reservations by guest name")
          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
           * CREATE TABLE reservation.guests (
           *   guest_id uuid PRIMARY KEY,
           *   first_name text,
           *   last_name text,
           *   title text,
           *   emails set<text>,
           *   phone_numbers list<text>,
           *   addresses map<text, frozen<address>>,
           *   confirm_number text
           * );
          UserDefinedType  udtAddressType = 
                  cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
                            .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)
          cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
                  .withPartitionKey(GUEST_ID, DataTypes.UUID)
                  .withColumn(FIRSTNAME, DataTypes.TEXT)
                  .withColumn(LASTNAME, DataTypes.TEXT)
                  .withColumn(TITLE, DataTypes.TEXT)
                  .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
                  .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
                  .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
                  .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                  .withComment("Q9. Find guest by ID")
           logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
           logger.info("Schema has been successfully initialized.");

    private void prepareStatements() {
        if (psExistReservation == null) {
            psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)
            psFindReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
            psSearchReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
            psDeleteReservationByConfirmation = cqlSession.prepare(
                                deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
            psDeleteReservationByHotelDate = cqlSession.prepare(
                    deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
            psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
            psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
            logger.info("Statements have been successfully prepared.");

Теперь мы закончили использовать cqlsh, давайте выйдем:


Устанавливаем и запускаем Apache Kafka

Теперь давайте сосредоточимся на расширении службы резервирования, чтобы она публиковала события в Apache Kafka.

Сначала скачиваем и устанавливаем Kafka:

wget http://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz

tar xzf kafka_2.12-2.5.0.tgz

Теперь у вас есть папка в вашей файловой системе с именем _kafka_x.x-x.x.x_ (на основе версий Scala и Kafka соответственно).

Чтобы запустить кластер Kafka с одним узлом, вам нужно запустить два процесса: Zookeeper и брокер Kafka. Kafka использует Zookeeper для управления кластером. Он работает вместе с каждым брокером и гарантирует, что брокер может взаимодействовать с кластером. Zookeeper должен быть запущен, чтобы брокер мог получать или передавать сообщения. Запускаем Zookeeper командой:

kafka_2.12-2.5.0/bin/zookeeper-server-start.sh kafka_2.12-2.5.0/config/zookeeper.properties &> zookeeper_start.log &

Далее запускаем брокера Kafka:

kafka_2.12-2.5.0/bin/kafka-server-start.sh kafka_2.12-2.5.0/config/server.properties &> kafka_start.log &

Создаем Kafka Topic и публикуем сообщения

Kafka поддерживает стиль обмена сообщениями публикации и подписки, при котором сообщения публикуются в темах в формате ключ-значение. Подобно Cassandra, Kafka разделяет данные с помощью ключа и реплицирует данные между несколькими узлами, известными как брокеры в Kafka.

Теперь давайте создадим топик Kafka для данных бронирования:

kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic reservation --config retention.ms=-1

Далее мы обновим службу резервирования, чтобы публиковать ее в разделе Kafka о бронировании каждый раз, когда резервирование изменяется или удаляется. Мы будем вводить элементы API понемногу, и сначала мы получим опыт работы с ними в jshell, Java REPL, прежде чем обновлять фактический код в службе бронирования:

cd reservation-service mvn -q compile com.github.johnpoth:jshell-maven-plugin:1.3:run

Давайте создадим KafkaProducer, который мы можем использовать для публикации сообщений в топике Kafka:

import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "ReservationService"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Давайте создадим объект Reservation, используя класс, который служба Reservation использует для представления обрабатываемого нами бронирования:

import java.time.LocalDate; import java.util.UUID; import dev.cassandraguide.model.Reservation; String confirmationNumber = "RS2G0Z" Reservation reservation = new Reservation(); reservation.setConfirmationNumber(confirmationNumber); reservation.setStartDate(LocalDate.now()); reservation.setEndDate(LocalDate.now().plusDays(2)); reservation.setHotelId("NY456"); reservation.setGuestId(UUID.fromString("1b4d86f4-ccff-4256-a63d-45c905df2677")); reservation.setRoomNumber((short)111);

Теперь сериализуйте этот класс в строку JSON, используя Jackson ObjectMapper:

Параметры, выбранные в objectMapper, предназначены для упрощения форматирования даты и красивой печати JSON. Чтобы увидеть результирующую строку, мы выполняем:


Теперь мы можем опубликовать сообщение в топике бронирования Kafka, используя confirmNumber в качестве ключа и reservationJson в качестве полезной нагрузки:

ProducerRecord<String, String> record = new ProducerRecord<>("reservation", confirmationNumber, reservationJson); producer.send(record);

Выйдем из jshell: exit

Давайте воспользуемся простым консьюмером командной строки, чтобы прочитать сообщение, которое мы только что опубликовали:

cd kafka_2.12-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic reservation --max-messages 10

Выйти из консьюмера можно с помощью Ctrl+C.

Обновление службы бронирования для публикации в Kafka

Теперь, когда у нас есть некоторый опыт публикации в Kafka, давайте обновим службу резервирования, чтобы публиковать события в Kafka при изменении или удалении резервирования. Взглянем на класс ReservationRepository:

 * Copyright (C) 2017-2020 Jeff Carpenter
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package dev.cassandraguide.repository;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;

import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.PreDestroy;

import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;

// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

 * The goal of this project is to provide a minimally functional implementation of a microservice 
 * that uses Apache Cassandra for its data storage. The reservation service is implemented as a 
 * RESTful service using Spring Boot.
 * @author Jeff Carpenter, Cedrick Lunven
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {

    /** Logger for the class. */
    private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
    // Reservation Schema Constants
    public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");
    public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
    public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
    public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
    public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");
    public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");
    public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");
    public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");
    public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");
    public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");
    public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");
    public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");
    public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");
    public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");
    public static final CqlIdentifier CONFIRM_NUMBER             = CqlIdentifier.fromCql("confirm_number");
    public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");
    public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");
    public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");
    public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");
    public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");
    public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");
    public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");
    public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");
    private PreparedStatement psExistReservation;
    private PreparedStatement psFindReservation;
    private PreparedStatement psInsertReservationByHotelDate;
    private PreparedStatement psInsertReservationByConfirmation;
    private PreparedStatement psDeleteReservationByHotelDate;
    private PreparedStatement psDeleteReservationByConfirmation;
    private PreparedStatement psSearchReservation;
    /** CqlSession holding metadata to interact with Cassandra. */
    private CqlSession     cqlSession;
    private CqlIdentifier  keyspaceName;

    // TODO: Review variables used for publishing to Kafka
    /** KafkaProducer for publishing messages to Kafka. */
    private KafkaProducer<String, String> kafkaProducer;
    private String kafkaTopicName;
    private ObjectMapper objectMapper;
    /** External Initialization. */
    public ReservationRepository(
            @NonNull CqlSession cqlSession, 
            @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
            @NonNull KafkaProducer<String, String> kafkaProducer,
            @NonNull String kafkaTopicName) {
        this.cqlSession   = cqlSession;
        this.keyspaceName = keyspaceName;

        // TODO: Review initialization of objects needed for publishing to Kafka
        this.kafkaProducer = kafkaProducer;
        this.kafkaTopicName = kafkaTopicName;

        objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());

        // Will create tables (if they do not exist)
        // Prepare Statements of reservation
        logger.info("Application initialized.");
     * CqlSession is a stateful object handling TCP connections to nodes in the cluster.
     * This operation properly closes sockets when the application is stopped
    public void cleanup() {
        if (null != cqlSession) {
            logger.info("+ CqlSession has been successfully closed");
     * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
     * table where confirmation number is partition key which is reservations_by_confirmation
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      true if the reservation exists, false if it does not
    public boolean exists(String confirmationNumber) {
        return cqlSession.execute(psExistReservation.bind(confirmationNumber))
                         .getAvailableWithoutFetching() > 0;
     * Similar to exists() but maps and parses results.
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      reservation if present or empty
    public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
        ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
        // Hint: an empty result might not be an error as this method is sometimes used to check whether a
        // reservation with this confirmation number exists
        Row row = resultSet.one();
        if (row == null) {
            logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
            return Optional.empty();
        // Hint: If there is a result, create a new reservation object and set the values
        // Bonus: factor the logic to extract a reservation from a row into a separate method
        // (you will reuse it again later in getAllReservations())
        return Optional.of(mapRowToReservation(row));
     * Create new entry in multiple tables for this reservation.
     * @param reservation
     *      current reservation object
     * @return
     *      confirmation number for the reservation
     public String upsert(Reservation reservation) {
        if (null == reservation.getConfirmationNumber()) {
            // Generating a new reservation number if none has been provided
        // Insert into 'reservations_by_hotel_date'
        BoundStatement bsInsertReservationByHotel = 
                psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
                        reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
        // Insert into 'reservations_by_confirmation'
        BoundStatement bsInsertReservationByConfirmation = 
                psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
                        reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
        BatchStatement batchInsertReservation = BatchStatement

        // TODO: Publish message to Kafka containing reservation
         try {
             String reservationJson = objectMapper.writeValueAsString(reservation);
             // HINT: use the constructor ProducerRecord(String topic, K key, V value)
             // with the reservation confirmation number as the key, and the JSON string as the value
             ProducerRecord<String, String> record = null;
         } catch (Exception e) {
             logger.warn("Error publishing reservation message to Kafka: {}", e);

        return reservation.getConfirmationNumber();

     * We pick 'reservations_by_confirmation' table to list reservations
     * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
     * @return
     *      list containing all reservations
    public List<Reservation> findAll() {
        return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
                  .all()                          // no paging we retrieve all objects
                  .stream()                       // because we are good people
                  .map(this::mapRowToReservation) // Mapping row as Reservation
                  .collect(Collectors.toList());  // Back to list objects
     * Deleting a reservation.
     * @param confirmationNumber
     *      unique identifier for confirmation.
    public boolean delete(String confirmationNumber) {

        // Retrieving entire reservation in order to obtain the attributes we will need to delete from
        // reservations_by_hotel_date table
        Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);

        if (reservationToDelete.isPresent()) {

            // Delete from 'reservations_by_hotel_date'
            Reservation reservation = reservationToDelete.get();
            BoundStatement bsDeleteReservationByHotelDate =
                            reservation.getStartDate(), reservation.getRoomNumber());

            // Delete from 'reservations_by_confirmation'
            BoundStatement bsDeleteReservationByConfirmation =

            BatchStatement batchDeleteReservation = BatchStatement

            // TODO: Publish message to Kafka with empty payload to indicate deletion
            // HINT: use the constructor ProducerRecord(String topic, K key, V value)
            // with the reservation confirmation number as the key, and an empty string as the value
            ProducerRecord<String, String> record = null;

            return true;
        return false;
     * Search all reservation for an hotel id and LocalDate.
     * @param hotelId
     *      hotel identifier
     * @param date
     *      searched Date
     * @return
     *      list of reservations matching the search criteria
    public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
        return cqlSession.execute(psSearchReservation.bind(hotelId, date))
                         .all()                          // no paging we retrieve all objects
                         .stream()                       // because we are good people
                         .map(this::mapRowToReservation) // Mapping row as Reservation
                         .collect(Collectors.toList());  // Back to list objects

     * Utility method to marshal a row as expected Reservation Bean.
     * @param row
     *      current row from ResultSet
     * @return
     *      object
    private Reservation mapRowToReservation(Row row) {
        Reservation reservation = new Reservation();
        return reservation;
     * Create Keyspace and relevant tables as per defined in 'reservation.cql'
    public void createReservationTables() {
         * Create TYPE 'Address' if not exists
         * CREATE TYPE reservation.address (
         *   street text,
         *   city text,
         *   state_or_province text,
         *   postal_code text,
         *   country text
         * );
                createType(keyspaceName, TYPE_ADDRESS)
                .withField(STREET, DataTypes.TEXT)
                .withField(CITY, DataTypes.TEXT)
                .withField(STATE_PROVINCE, DataTypes.TEXT)
                .withField(POSTAL_CODE, DataTypes.TEXT)
                .withField(COUNTRY, DataTypes.TEXT)
        logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
         * CREATE TABLE reservation.reservations_by_hotel_date (
         *  hotel_id text,
         *  start_date date,
         *  end_date date,
         *  room_number smallint,
         *  confirm_number text,
         *  guest_id uuid,
         *  PRIMARY KEY ((hotel_id, start_date), room_number)
         * );
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                        .withPartitionKey(HOTEL_ID, DataTypes.TEXT)
                        .withPartitionKey(START_DATE, DataTypes.DATE)
                        .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                        .withColumn(END_DATE, DataTypes.DATE)
                        .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                        .withColumn(GUEST_ID, DataTypes.UUID)
                        .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
                        .withComment("Q7. Find reservations by hotel and date")
        logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
         * CREATE TABLE reservation.reservations_by_confirmation (
         *   confirm_number text PRIMARY KEY,
         *   hotel_id text,
         *   start_date date,
         *   end_date date,
         *   room_number smallint,
         *   guest_id uuid
         * );
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)
                .withColumn(HOTEL_ID, DataTypes.TEXT)
                .withColumn(START_DATE, DataTypes.DATE)
                .withColumn(END_DATE, DataTypes.DATE)
                .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                .withColumn(GUEST_ID, DataTypes.UUID)
         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
          * CREATE TABLE reservation.reservations_by_guest (
          *  guest_last_name text,
          *  hotel_id text,
          *  start_date date,
          *  end_date date,
          *  room_number smallint,
          *  confirm_number text,
          *  guest_id uuid,
          *  PRIMARY KEY ((guest_last_name), hotel_id)
          * );
         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
                 .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
                 .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
                 .withColumn(START_DATE, DataTypes.DATE)
                 .withColumn(END_DATE, DataTypes.DATE)
                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                 .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                 .withColumn(GUEST_ID, DataTypes.UUID)
                 .withComment("Q8. Find reservations by guest name")
          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
           * CREATE TABLE reservation.guests (
           *   guest_id uuid PRIMARY KEY,
           *   first_name text,
           *   last_name text,
           *   title text,
           *   emails set<text>,
           *   phone_numbers list<text>,
           *   addresses map<text, frozen<address>>,
           *   confirm_number text
           * );
          UserDefinedType  udtAddressType = 
                  cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
                            .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)
          cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
                  .withPartitionKey(GUEST_ID, DataTypes.UUID)
                  .withColumn(FIRSTNAME, DataTypes.TEXT)
                  .withColumn(LASTNAME, DataTypes.TEXT)
                  .withColumn(TITLE, DataTypes.TEXT)
                  .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
                  .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
                  .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
                  .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                  .withComment("Q9. Find guest by ID")
           logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
           logger.info("Schema has been successfully initialized.");

    private void prepareStatements() {
        if (psExistReservation == null) {
            psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)
            psFindReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
            psSearchReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
            psDeleteReservationByConfirmation = cqlSession.prepare(
                                deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
            psDeleteReservationByHotelDate = cqlSession.prepare(
                    deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
            psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
            psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
            logger.info("Statements have been successfully prepared.");

TODO в этом классе включают несколько пунктов, которые мы должны рассмотреть:

  • TODO: проверяет импорт для публикации в Kafka. Обратите внимание, что классы, которые вы использовали ранее, импортированы.

  • TODO: Это обзор переменных, используемых для публикации в Kafka. В репозитории хранятся ObjectMapper и KafkaProducer.

  • TODO: Проверка инициализации объектов, необходимых для публикации в Kafka, инициализация переменных.

Теперь самое интересное — реализация кода для публикации сообщений:

  • TODO: Это публикует сообщение Кафке, содержащее reservation. В методе upsert() опубликуйте сообщение, содержащее резервирование, в виде строки JSON.

  • TODO: публикует сообщение для Kafka с пустой полезной нагрузкой, указывающее на удаление. В методе delete() публикует сообщение об удалении резервирования.

Подправим наш код:

 * Copyright (C) 2017-2020 Jeff Carpenter
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package dev.cassandraguide.repository;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;

import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.PreDestroy;

import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;

// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

 * The goal of this project is to provide a minimally functional implementation of a microservice 
 * that uses Apache Cassandra for its data storage. The reservation service is implemented as a 
 * RESTful service using Spring Boot.
 * @author Jeff Carpenter, Cedrick Lunven
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {

    /** Logger for the class. */
    private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
    // Reservation Schema Constants
    public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");
    public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
    public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
    public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
    public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");
    public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");
    public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");
    public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");
    public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");
    public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");
    public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");
    public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");
    public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");
    public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");
    public static final CqlIdentifier CONFIRMATION_NUMBER        = CqlIdentifier.fromCql("confirmation_number");
    public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");
    public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");
    public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");
    public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");
    public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");
    public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");
    public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");
    public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");
    private PreparedStatement psExistReservation;
    private PreparedStatement psFindReservation;
    private PreparedStatement psInsertReservationByHotelDate;
    private PreparedStatement psInsertReservationByConfirmation;
    private PreparedStatement psDeleteReservationByHotelDate;
    private PreparedStatement psDeleteReservationByConfirmation;
    private PreparedStatement psSearchReservation;
    /** CqlSession holding metadata to interact with Cassandra. */
    private CqlSession     cqlSession;
    private CqlIdentifier  keyspaceName;

    // TODO: Review variables used for publishing to Kafka
    /** KafkaProducer for publishing messages to Kafka. */
    private KafkaProducer<String, String> kafkaProducer;
    private String kafkaTopicName;
    private ObjectMapper objectMapper;
    /** External Initialization. */
    public ReservationRepository(
            @NonNull CqlSession cqlSession, 
            @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
            @NonNull KafkaProducer<String, String> kafkaProducer,
            @NonNull String kafkaTopicName) {
        this.cqlSession   = cqlSession;
        this.keyspaceName = keyspaceName;

        // TODO: Review initialization of objects needed for publishing to Kafka
        this.kafkaProducer = kafkaProducer;
        this.kafkaTopicName = kafkaTopicName;

        objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());

        // Will create tables (if they do not exist)
        // Prepare Statements of reservation
        logger.info("Application initialized.");
     * CqlSession is a stateful object handling TCP connections to nodes in the cluster.
     * This operation properly closes sockets when the application is stopped
    public void cleanup() {
        if (null != cqlSession) {
            logger.info("+ CqlSession has been successfully closed");
     * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
     * table where confirmation number is partition key which is reservations_by_confirmation
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      true if the reservation exists, false if it does not
    public boolean exists(String confirmationNumber) {
        return cqlSession.execute(psExistReservation.bind(confirmationNumber))
                         .getAvailableWithoutFetching() > 0;
     * Similar to exists() but maps and parses results.
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      reservation if present or empty
    public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
        ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
        // Hint: an empty result might not be an error as this method is sometimes used to check whether a
        // reservation with this confirmation number exists
        Row row = resultSet.one();
        if (row == null) {
            logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
            return Optional.empty();
        // Hint: If there is a result, create a new reservation object and set the values
        // Bonus: factor the logic to extract a reservation from a row into a separate method
        // (you will reuse it again later in getAllReservations())
        return Optional.of(mapRowToReservation(row));
     * Create new entry in multiple tables for this reservation.
     * @param reservation
     *      current reservation object
     * @return
     *      confirmation number for the reservation
     public String upsert(Reservation reservation) {
        if (null == reservation.getConfirmationNumber()) {
            // Generating a new reservation number if none has been provided
        // Insert into 'reservations_by_hotel_date'
        BoundStatement bsInsertReservationByHotel = 
                psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
                        reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
        // Insert into 'reservations_by_confirmation'
        BoundStatement bsInsertReservationByConfirmation = 
                psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
                        reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
        BatchStatement batchInsertReservation = BatchStatement

        // TODO: Publish message to Kafka containing reservation
         try {
             String reservationJson = objectMapper.writeValueAsString(reservation);
             ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopicName, reservation.getConfirmationNumber(), reservationJson);
         } catch (Exception e) {
             logger.warn("Error publishing reservation message to Kafka: {}", e);

        return reservation.getConfirmationNumber();

     * We pick 'reservations_by_confirmation' table to list reservations
     * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
     * @return
     *      list containing all reservations
    public List<Reservation> findAll() {
        return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
                  .all()                          // no paging we retrieve all objects
                  .stream()                       // because we are good people
                  .map(this::mapRowToReservation) // Mapping row as Reservation
                  .collect(Collectors.toList());  // Back to list objects
     * Deleting a reservation.
     * @param confirmationNumber
     *      unique identifier for confirmation.
    public boolean delete(String confirmationNumber) {

        // Retrieving entire reservation in order to obtain the attributes we will need to delete from
        // reservations_by_hotel_date table
        Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);

        if (reservationToDelete.isPresent()) {

            // Delete from 'reservations_by_hotel_date'
            Reservation reservation = reservationToDelete.get();
            BoundStatement bsDeleteReservationByHotelDate =
                            reservation.getStartDate(), reservation.getRoomNumber());

            // Delete from 'reservations_by_confirmation'
            BoundStatement bsDeleteReservationByConfirmation =

            BatchStatement batchDeleteReservation = BatchStatement

            // TODO: Publish message to Kafka with empty payload to indicate deletion
            ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopicName, reservation.getConfirmationNumber(), "");

            return true;
        return false;
     * Search all reservation for an hotel id and LocalDate.
     * @param hotelId
     *      hotel identifier
     * @param date
     *      searched Date
     * @return
     *      list of reservations matching the search criteria
    public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
        return cqlSession.execute(psSearchReservation.bind(hotelId, date))
                         .all()                          // no paging we retrieve all objects
                         .stream()                       // because we are good people
                         .map(this::mapRowToReservation) // Mapping row as Reservation
                         .collect(Collectors.toList());  // Back to list objects

     * Utility method to marshal a row as expected Reservation Bean.
     * @param row
     *      current row from ResultSet
     * @return
     *      object
    private Reservation mapRowToReservation(Row row) {
        Reservation reservation = new Reservation();
        return reservation;
     * Create Keyspace and relevant tables as per defined in 'reservation.cql'
    public void createReservationTables() {
         * Create TYPE 'Address' if not exists
         * CREATE TYPE reservation.address (
         *   street text,
         *   city text,
         *   state_or_province text,
         *   postal_code text,
         *   country text
         * );
                createType(keyspaceName, TYPE_ADDRESS)
                .withField(STREET, DataTypes.TEXT)
                .withField(CITY, DataTypes.TEXT)
                .withField(STATE_PROVINCE, DataTypes.TEXT)
                .withField(POSTAL_CODE, DataTypes.TEXT)
                .withField(COUNTRY, DataTypes.TEXT)
        logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
         * CREATE TABLE reservation.reservations_by_hotel_date (
         *  hotel_id text,
         *  start_date date,
         *  end_date date,
         *  room_number smallint,
         *  confirmation_number text,
         *  guest_id uuid,
         *  PRIMARY KEY ((hotel_id, start_date), room_number)
         * ) WITH comment = 'Q7. Find reservations by hotel and date';
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                        .withPartitionKey(HOTEL_ID, DataTypes.TEXT)
                        .withPartitionKey(START_DATE, DataTypes.DATE)
                        .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                        .withColumn(END_DATE, DataTypes.DATE)
                        .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)
                        .withColumn(GUEST_ID, DataTypes.UUID)
                        .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
                        .withComment("Q7. Find reservations by hotel and date")
        logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
         * CREATE TABLE reservation.reservations_by_confirmation (
         *   confirmation_number text PRIMARY KEY,
         *   hotel_id text,
         *   start_date date,
         *   end_date date,
         *   room_number smallint,
         *   guest_id uuid
         * );
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                .withPartitionKey(CONFIRMATION_NUMBER, DataTypes.TEXT)
                .withColumn(HOTEL_ID, DataTypes.TEXT)
                .withColumn(START_DATE, DataTypes.DATE)
                .withColumn(END_DATE, DataTypes.DATE)
                .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                .withColumn(GUEST_ID, DataTypes.UUID)
         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
          * CREATE TABLE reservation.reservations_by_guest (
          *  guest_last_name text,
          *  hotel_id text,
          *  start_date date,
          *  end_date date,
          *  room_number smallint,
          *  confirmation_number text,
          *  guest_id uuid,
          *  PRIMARY KEY ((guest_last_name), hotel_id)
          * ) WITH comment = 'Q8. Find reservations by guest name';
         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
                 .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
                 .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
                 .withColumn(START_DATE, DataTypes.DATE)
                 .withColumn(END_DATE, DataTypes.DATE)
                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                 .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)
                 .withColumn(GUEST_ID, DataTypes.UUID)
                 .withComment("Q8. Find reservations by guest name")
          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
           * CREATE TABLE reservation.guests (
           *   guest_id uuid PRIMARY KEY,
           *   first_name text,
           *   last_name text,
           *   title text,
           *   emails set<text>,
           *   phone_numbers list<text>,
           *   addresses map<text, frozen<address>>,
           *   confirmation_number text
           * ) WITH comment = 'Q9. Find guest by ID';
          UserDefinedType  udtAddressType = 
                  cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
                            .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)
          cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
                  .withPartitionKey(GUEST_ID, DataTypes.UUID)
                  .withColumn(FIRSTNAME, DataTypes.TEXT)
                  .withColumn(LASTNAME, DataTypes.TEXT)
                  .withColumn(TITLE, DataTypes.TEXT)
                  .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
                  .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
                  .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
                  .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)
                  .withComment("Q9. Find guest by ID")
           logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
           logger.info("Schema has been successfully initialized.");

    private void prepareStatements() {
        if (psExistReservation == null) {
            psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRMATION_NUMBER)
            psFindReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
            psSearchReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
            psDeleteReservationByConfirmation = cqlSession.prepare(
                                deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
            psDeleteReservationByHotelDate = cqlSession.prepare(
                    deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
            psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(CONFIRMATION_NUMBER, bindMarker(CONFIRMATION_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
            psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                    .value(CONFIRMATION_NUMBER, bindMarker(CONFIRMATION_NUMBER))
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
            logger.info("Statements have been successfully prepared.");

И переходим к тестам.

Служба резервирования также включает тесты, которые выполняются для контейнера Cassandra, запущенного в Docker, который запускается для целей теста, а затем уничтожается.

Перед запуском этого теста мы хотим отключить узел Cassandra:

kill `cat /tmp/cassandra-pid`

Теперь мы можем запустить тесты с помощью Maven:

mvn test

Мы узнали, как написать микросервис, который сохраняет данные в Apache Cassandra с помощью Java-драйвера DataStax и генерирует события для изменения данных в Apache Kafka.

