spring webflux conectar reader and writer rds

En el mundo de la programación reactiva, Spring WebFlux ha surgido como una poderosa herramienta para construir aplicaciones robustas y eficientes. En esta guía, exploraremos cómo conectar Reader y Writer a una base de datos relacional (RDS) utilizando Spring WebFlux.

La interacción fluida entre la capa de lectura y escritura es fundamental para el desarrollo de aplicaciones de alta calidad. En este contexto, aprovechar la capacidad de WebFlux para gestionar operaciones de entrada y salida de manera reactiva puede marcar la diferencia en el rendimiento y la escalabilidad de tu aplicación.

Desde la configuración inicial hasta la implementación práctica, te guiaremos a través de los pasos necesarios para integrar eficazmente Reader y Writer con tu base de datos relacional utilizando Spring WebFlux. Prepárate para sumergirte en un viaje de descubrimiento mientras desbloqueamos los secretos de esta potente combinación tecnológica.

Para este ejemplo vamos a usar scaffold-clean-architecture para seguir patrón de desarrollo de buenas practicas, la cual no va ser una limitante si se implementa en otros patrones.

  1. Creamos 2 paquetes uno para Writer y para Reader
Writer -> co.ncleguizamon.posgres.r2dbc-postgresql
Reader -> co.ncleguizamon.posgres.r2dbc-postgresql-reader

para el caso de clean architecture seria generar dos Driven Adapter por comandos

Writer

   gradle generateDrivenAdapter --type=r2dbc
   

Reader

   gradle generateDrivenAdapter --type=generic --name=r2dbc-postgresql-reader 
   
  1. Crear bean
@Bean
    @Profile("local")
    public SecretPostgres secretsModelLocal(PostgresqlConnectionProperties postgresqlProperties) {
        return SecretPostgres.builder()
                .dbname(postgresqlProperties.getDatabase())
                .host(postgresqlProperties.getHost())
                .password(postgresqlProperties.getPassword())
                .username(postgresqlProperties.getUsername())
                .port(postgresqlProperties.getPort())
                .schema(postgresqlProperties.getSchema())
                .urlReaderRds(postgresqlProperties.geturlReaderRds())
                .build();
    }   
  1. En cada paquete creamos nuestra configuración de nuestra carpeta de config

y una clase de configuración, en este caso llamada Config.java

Writer

@Configuration
@AllArgsConstructor
@EnableConfigurationProperties({PostgresqlConnectionProperties.class})
@EnableR2dbcRepositories(basePackages = "co.ncleguizamon.posgres.r2dbc-postgresql", entityOperationsRef = "postgresEntityTemplate")
public class Config {
    public static final int INITIAL_SIZE = 1;
    public static final int MAX_SIZE = 2;
    public static final int MAX_IDLE_TIME = 30;

    SecretPostgres secretPostgres;

    @Bean
    public R2dbcEntityOperations postgresEntityTemplate(
            @Qualifier("postgresConnection") ConnectionFactory connectionFactory) {
        DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);
        return new R2dbcEntityTemplate(databaseClient, PostgresDialect.INSTANCE);
    }

    @Bean(name = "postgresConnection")
    public ConnectionPool getConnectionConfig() {
        PostgresqlConnectionConfiguration dbConfiguration = PostgresqlConnectionConfiguration.builder()
                .host(secretPostgres.getHost())
                .port(secretPostgres.getPort())
                .database(secretPostgres.getDbname())
                .schema(secretPostgres.getSchema())
                .username(secretPostgres.getUsername())
                .password(secretPostgres.getPassword())
                .build();

        ConnectionPoolConfiguration poolConfiguration = ConnectionPoolConfiguration.builder()
                .connectionFactory(new PostgresqlConnectionFactory(dbConfiguration))
                .name("api-postgres-connection-pool")
                .initialSize(INITIAL_SIZE)
                .maxSize(MAX_SIZE)
                .maxIdleTime(Duration.ofMinutes(MAX_IDLE_TIME))
                .validationQuery("SELECT 1")
                .build();

        return new ConnectionPool(poolConfiguration);
    
    // opcinal si usas 

    @Bean
    public TransactionalOperator transactionalOperator(
            @Qualifier("postgresConnection") ConnectionFactory connectionFactory) {
        return TransactionalOperator.create(new R2dbcTransactionManager(connectionFactory));
    }
}   

PostgresqlConnectionProperties.java

@Data
@ConfigurationProperties(prefix = "spring.r2bdc-postgres")
public class PostgresqlConnectionProperties {

    private String database;
    private String schema;
    private String username;
    private String password;
    private String host;
    private String urlReaderRds;
    private Integer port;
    

}
   

Reader

@Configuration
@AllArgsConstructor
@EnableR2dbcRepositories(basePackages = "co.ncleguizamon.posgres.r2dbc-postgresql-reader",
        entityOperationsRef = "postgresReaderEntityTemplate")
public class Config{
    public static final int INITIAL_SIZE = 3;
    public static final int MAX_SIZE = 6;
    public static final int MAX_IDLE_TIME = 30;

    SecretPostgres secretPostgres;

    @Bean
    public R2dbcEntityOperations postgresReaderEntityTemplate(
            @Qualifier("postgresReaderConnection") ConnectionFactory connectionFactory) {
        DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);
        return new R2dbcEntityTemplate(databaseClient, PostgresDialect.INSTANCE);
    }

    @Bean(name = "postgresReaderConnection")
    public ConnectionPool getConnectionConfig() String urlReaderRds) {
        PostgresqlConnectionConfiguration dbConfiguration = PostgresqlConnectionConfiguration.builder()
                .host(secretPostgres.urlReaderRds == null? secretPostgres.getHost(): secretPostgres.urlReaderRds)
                .port(secretPostgres.getPort())
                .database(secretPostgres.getDbname())
                .schema(secretPostgres.getSchema())
                .username(secretPostgres.getUsername())
                .password(secretPostgres.getPassword())
                .build();

        ConnectionPoolConfiguration poolConfiguration = ConnectionPoolConfiguration.builder()
                .connectionFactory(new PostgresqlConnectionFactory(dbConfiguration))
                .name("api-postgres-connection-pool-reader")
                .initialSize(INITIAL_SIZE)
                .maxSize(MAX_SIZE)
                .maxIdleTime(Duration.ofMinutes(MAX_IDLE_TIME))
                .validationQuery("SELECT 1")
                .build();

        return new ConnectionPool(poolConfiguration);
    }
   

@EnableR2dbcRepositories

@EnableR2dbcRepositories se utiliza para habilitar los repositorios de Spring Data R2DBC para el paquete base especificado (com.example.repository). Spring generará automáticamente implementaciones para las interfaces de repositorio que se encuentren en este paquete, lo que te permitirá usarlas en tu aplicación.

Asegúrate de haber incluido las dependencias necesarias en tu proyecto para usar Spring Data R2DBC, como spring-boot-starter-data-r2dbc si estás utilizando Spring Boot.

entityOperationsRef

Referencia del bean que hace referencia a R2dbcEntityOperations, según sea el repository, configurado .

Para usar cada uno de los dos paquetes , debes implementar unas interface por cada uno de los paquetes , para poder usarlos en tus casos de uso , ejemplo.

Reader , solo lecturas

public interface UserReaderService {


    Flux<User> findByAllId(Long id);

    Flux<User>  getList(Long id);
    ....

}

   

Writer , updates , create , delete

public interface UserWriterService {


    Mono<User> deleteById(Long id);
    Mono<User> create(Long id);

    .....
    
}

   

Implementa en tu Adapter, según sea el caso.

@Repository
@RequiredArgsConstructor
public class UserRepositoryAdapter implements UserReaderService {
// repository en el paquete postgresReaderEntityTemplate
    private final UserDataRepository userDataRepository;


    ....


    
   

ejemplo: reader

recuerda si usas jpa debes usar @EnableJpaRepositories

Reference:

EnableR2dbcRepositories (Spring Data R2DBC 3.2.5 API)
declaration: package: org.springframework.data.r2dbc.repository.config, annotation type: EnableR2dbcRepositories
Repositories cannot be used with two different database systems · Issue #406 · spring-projects/spring-data-r2dbc
There is a conflict with methods query generator in repository when you have 2 different database servers mssql and mysql at the same time. And it renders query with dialect of only certain databas…