Subir archivos por partes en s3 java reactiva

Subir archivos por partes a s3, archivos muy grandes.

Uno de los principales problemas al manejar grandes cantidades de datos es la capacidad limitada de la máquina encargada de procesarlos. Almacenar la información en memoria no garantiza un sistema resiliente, ya que, en caso de falla, se requeriría reiniciar el proceso desde cero. A continuación, se presenta un caso de uso para una estación de datos, cuyo almacenamiento posterior se realiza en un archivo en S3. Este enfoque permitirá paralelizar las consultas y transformaciones en diferentes nodos, de manera que, si alguno de ellos falla, solo se necesite reprocesar la parte faltante y no toda la información.

  1. Initiate Multipart Upload: (master) , Cuando envía una solicitud para iniciar una carga multipart.
  2. Carga de las partes: (Slaves) En cada solicitud de carga, debe incluir el ID de la carga multiparte que obtuvo en el paso 1.
  3. Finalización (o detención) de una carga multiparte:(master), Después de cargar todas las partes del archivo, puede utilizar la operación de finalización. Una vez más, debe especificar el ID de carga en la solicitud. 

Ejemplo de uso extrayendo datos de una base de datos, el cual usa la funcionalidad anterior mente mencionada, pero el tema de paralelismo y entre los nodos depende mucho de tu arquitectura o diseño, :

   public Mono<Void> uploadFile(LocalDate processDate, String bucketName, String kmsKeyId) {
        String objectKey = "folder/file.txt";
        Mono<String> headerMono = Mono.fromCallable(this::getLineHeaderFile);
        return s3Operation.initiateMultipartUpload(bucketName, objectKey , kmsKeyId)
                .zipWith(headerMono)
                .flatMap(res -> uploadFilePart(Tuples.of(processDate, res.getT2()),
                        bucketName, objectKey, res.getT1())
                        .zipWith(Mono.just(res))
                )
                .flatMap(res1 -> {
                    var uploadId = res1.getT2().getT1();
                    return s3Operation.completeMultipartUpload(bucketName, objectKey, uploadId, res1.getT1());
                })
                .flatMap(res -> {
                    log.info(res);
                    return Mono.empty();
                });
    }

master


   
    private String getLineDetailFile(Data  data){
    //justify data a string
    return "test data";

    }
   
   public Flux<String> getDataToFile(LocalDate initRange, LocalDate endRange) {
        return infoReaderRepository
                .getAll(initRange, endRange) // get info
                .map(this::getLineDetailFile);
    }
 
 public Flux<Tuple2<Long, List<String>>> getRangeDate(LocalDate processDate) {
    return Flux.just("1", "2")
                .flatMap(res -> getDataToFile(res.getT1(), res.getT2()))
                .buffer(100000)
                .index()
                .parallel()
                .runOn(Schedulers.boundedElastic()).sequential();
    }
  
  
  
  public Mono<List<Tuple2<Integer, String>>> uploadFilePart(Tuple2<LocalDate, String> processDate,
                                                                            String bucketName, String objectKey,
                                                                            String uploadId) {

        return getRangeDate(processDate.getT1())
                .flatMap(res -> getByte(res.getT2(), res.getT1(), processDate.getT2())
                        .zipWith(Mono.just(res.getT1())
                        ))
                .flatMap(res -> {
                    log.info("length  " + res.getT1().length);
                    var partNumber = Math.toIntExact(res.getT2() + 1);
                    return s3Operation.multipartUpload(bucketName, objectKey, uploadId, res.getT1(), partNumber)
                            .map(tag -> Tuples.of(partNumber, tag));
                })
                .collectList();
    }
    

Slaves

validamos las partes o la data pese mas de 5 mg , y concatenamos el header del archivo en este caso

 public Mono<byte[]> getByte(List<String> res, Long index, String header) {
        return Mono.just(res.stream()
                        .reduce("", (acc, line) -> acc + line).
                        getBytes(StandardCharsets.UTF_8))
                .map(str -> {
                    byte[] byteArray = str;
                    final int MIN_SIZE = 5 * 1024 * 1024;
                    if (index == 0) {
                        byte[] bytes1 = header.getBytes(StandardCharsets.UTF_8);
                        byte[] combinedBytes = new byte[bytes1.length + str.length];
                        System.arraycopy(bytes1, 0, combinedBytes, 0, bytes1.length);
                        System.arraycopy(str, 0, combinedBytes, bytes1.length, str.length);
                        byteArray = combinedBytes;
                    }
                    if ((byteArray.length < MIN_SIZE)) {
                        var paddedArray = new byte[MIN_SIZE];
                        System.arraycopy(byteArray, 0, paddedArray, 0, byteArray.length);
                        Arrays.fill(paddedArray, byteArray.length, MIN_SIZE, (byte) 0);
                        byteArray = paddedArray;
                    }
                    return byteArray;
                });
    }
    

Slaves

En el adaptador de s3 vamos a tener la implementación de s3Operation

AdapterS3.java

private final S3BucketOperations s3Operations;


    @Override
    public Mono<List<String>> getItems(String bucketName) {
        return s3Operations.listBucketObjects(bucketName)
                .flatMap(s3Objects ->
                        Mono.just(
                                s3Objects.stream().map(S3Object::key)
                                        .collect(Collectors.toList())
                        )
                );

    }


    @Override
    public Mono<String> initiateMultipartUpload(String bucketName, String objectKey , String kmsKeyId) {
        CreateMultipartUploadRequest createRequest = CreateMultipartUploadRequest.builder()
                .bucket(bucketName)
                .key(objectKey)
                .serverSideEncryption(ServerSideEncryption.AWS_KMS)
                .ssekmsKeyId(kmsKeyId)
                .build();
        return s3Operations.initiateMultipartUpload(createRequest);
    }


    @Override
    public Mono<String> multipartUpload(String bucketName, String objectKey, String uploadId, byte[] part, int partNumber) {

        return s3Operations.uploadPart(bucketName, objectKey, uploadId, partNumber, part)
                .map(UploadPartResponse::eTag)
                .log(" Part uploaded: ")
                .onErrorResume(e -> {
                    log.info("Error multipartUpload part: " + e.getMessage());
                    return Mono.error(e);
                });

    }

    @Override
    public Mono<String> completeMultipartUpload(String bucketName, String objectKey, String uploadId, List<Tuple2<Integer, String>> eTags) {
        List<CompletedPart> completedParts = eTags.stream()
                .map(tuple ->
                        CompletedPart.builder()
                                .partNumber(tuple.getT1())
                                .eTag(tuple.getT2())
                                .build()
                )
                .sorted(Comparator.comparingInt(CompletedPart::partNumber))
                .collect(Collectors.toList());

        log.info("completedParts: " + completedParts);
        CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
                .bucket(bucketName)
                .key(objectKey)
                .uploadId(uploadId)
                .multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
                .build();
        return s3Operations.completeMultipartUpload(completeRequest)
                .then(Mono.just(uploadId))
                .onErrorResume(e -> {
                    log.info("Error completeMultipartUpload part: " + e.getMessage());
                    return Mono.error(e);
                });
    }

    

S3BucketOperations.java


    private final S3AsyncClient s3AsyncClient;


    public Mono<Void> completeMultipartUpload( CompleteMultipartUploadRequest completeRequest ) {

        return Mono.fromFuture(() -> s3AsyncClient.completeMultipartUpload(completeRequest))
                .then()
                .onErrorResume(e ->{
                    log.info("Error completeMultipartUpload part: "+e.getMessage());
                    return Mono.error(e);
                });
    }

    public Mono<UploadPartResponse> uploadPart(String bucketName, String objectKey, String uploadId,
                                                Integer partNumber, byte[] partData) {
        UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
                .bucket(bucketName)
                .key(objectKey)
                .uploadId(uploadId)
                .partNumber(partNumber)
                .build();

        return Mono.fromFuture(() -> s3AsyncClient.uploadPart(uploadPartRequest , AsyncRequestBody.fromBytes(partData)))
                .onErrorResume(e ->{
                  log.info("Error uploading part: "+e.getMessage());
                    return Mono.error(e);
                });
    }
    public Mono<String> initiateMultipartUpload(CreateMultipartUploadRequest createRequest) {

        return Mono.fromFuture(() -> s3AsyncClient.createMultipartUpload(createRequest))
                .map(CreateMultipartUploadResponse::uploadId)
                .onErrorResume(e ->{
                    log.info("Error initiateMultipartUpload part: "+e.getMessage());
                    return Mono.error(e);
                });
    }


    public Mono<Boolean> uploadObject(String bucketName,String objectKey, byte[] fileContent) {

        return Mono.fromFuture(
                s3AsyncClient.putObject(configurePutObject(bucketName,objectKey),
                        AsyncRequestBody.fromBytes(fileContent)))
                .map(response -> response.sdkHttpResponse().isSuccessful());
    }

    public Mono<List<S3Object>> listBucketObjects(String bucketName){
        return Mono.fromFuture(s3AsyncClient.listObjects(ListObjectsRequest
                .builder()
                .bucket(bucketName)
                .build())).map(ListObjectsResponse::contents);
    }

    public Mono<InputStream> getObject(String bucketName,String objectKey) {
        Mono.fromFuture(s3AsyncClient.deleteObject(DeleteObjectRequest.builder()
                .bucket(bucketName)
                .key(objectKey).build()))
                .delayElement(Duration.ofMinutes(1))
                .subscribe();
        return Mono.fromFuture(s3AsyncClient.getObject(GetObjectRequest.builder()
                .key(objectKey)
                .bucket(bucketName)
                .build(), AsyncResponseTransformer.toBytes()))
                .map(BytesWrapper::asInputStream);
    }

    private PutObjectRequest configurePutObject(String bucketName, String objectKey) {
        return PutObjectRequest.builder()
                .bucket(bucketName)
                .key(objectKey)
                .build();
    }
    

happy code :).

Referencias:

Carga de archivos grandes por partes con Amazon SDK for Java - Amazon S3 Glacier
Ejemplos de código Java acerca de cómo cargar archivos grandes por partes en S3 Glacier con Amazon SDK para Java.
​​How to Upload Large Files Efficiently with AWS S3 Multipart Upload
Imagine running a media streaming platform where users upload large high-definition videos. Uploading such large files can be slow and may fail if the network is unreliable. Using traditional single-part uploads can be cumbersome and inefficient for…