Subir archivos por partes en s3 java reactiva
Subir archivos por partes a s3, archivos muy grandes.
Uno de los principales problemas al manejar grandes cantidades de datos es la capacidad limitada de la máquina encargada de procesarlos. Almacenar la información en memoria no garantiza un sistema resiliente, ya que, en caso de falla, se requeriría reiniciar el proceso desde cero. A continuación, se presenta un caso de uso para una estación de datos, cuyo almacenamiento posterior se realiza en un archivo en S3. Este enfoque permitirá paralelizar las consultas y transformaciones en diferentes nodos, de manera que, si alguno de ellos falla, solo se necesite reprocesar la parte faltante y no toda la información.
- Initiate Multipart Upload: (master) , Cuando envía una solicitud para iniciar una carga multipart.
- Carga de las partes: (Slaves) En cada solicitud de carga, debe incluir el ID de la carga multiparte que obtuvo en el paso 1.
- Finalización (o detención) de una carga multiparte:(master), Después de cargar todas las partes del archivo, puede utilizar la operación de finalización. Una vez más, debe especificar el ID de carga en la solicitud.
Ejemplo de uso extrayendo datos de una base de datos, el cual usa la funcionalidad anterior mente mencionada, pero el tema de paralelismo y entre los nodos depende mucho de tu arquitectura o diseño, :
validamos las partes o la data pese mas de 5 mg , y concatenamos el header del archivo en este caso
En el adaptador de s3 vamos a tener la implementación de s3Operation
AdapterS3.java
private final S3BucketOperations s3Operations;
@Override
public Mono<List<String>> getItems(String bucketName) {
return s3Operations.listBucketObjects(bucketName)
.flatMap(s3Objects ->
Mono.just(
s3Objects.stream().map(S3Object::key)
.collect(Collectors.toList())
)
);
}
@Override
public Mono<String> initiateMultipartUpload(String bucketName, String objectKey , String kmsKeyId) {
CreateMultipartUploadRequest createRequest = CreateMultipartUploadRequest.builder()
.bucket(bucketName)
.key(objectKey)
.serverSideEncryption(ServerSideEncryption.AWS_KMS)
.ssekmsKeyId(kmsKeyId)
.build();
return s3Operations.initiateMultipartUpload(createRequest);
}
@Override
public Mono<String> multipartUpload(String bucketName, String objectKey, String uploadId, byte[] part, int partNumber) {
return s3Operations.uploadPart(bucketName, objectKey, uploadId, partNumber, part)
.map(UploadPartResponse::eTag)
.log(" Part uploaded: ")
.onErrorResume(e -> {
log.info("Error multipartUpload part: " + e.getMessage());
return Mono.error(e);
});
}
@Override
public Mono<String> completeMultipartUpload(String bucketName, String objectKey, String uploadId, List<Tuple2<Integer, String>> eTags) {
List<CompletedPart> completedParts = eTags.stream()
.map(tuple ->
CompletedPart.builder()
.partNumber(tuple.getT1())
.eTag(tuple.getT2())
.build()
)
.sorted(Comparator.comparingInt(CompletedPart::partNumber))
.collect(Collectors.toList());
log.info("completedParts: " + completedParts);
CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
.key(objectKey)
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
.build();
return s3Operations.completeMultipartUpload(completeRequest)
.then(Mono.just(uploadId))
.onErrorResume(e -> {
log.info("Error completeMultipartUpload part: " + e.getMessage());
return Mono.error(e);
});
}
S3BucketOperations.java
private final S3AsyncClient s3AsyncClient;
public Mono<Void> completeMultipartUpload( CompleteMultipartUploadRequest completeRequest ) {
return Mono.fromFuture(() -> s3AsyncClient.completeMultipartUpload(completeRequest))
.then()
.onErrorResume(e ->{
log.info("Error completeMultipartUpload part: "+e.getMessage());
return Mono.error(e);
});
}
public Mono<UploadPartResponse> uploadPart(String bucketName, String objectKey, String uploadId,
Integer partNumber, byte[] partData) {
UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
.bucket(bucketName)
.key(objectKey)
.uploadId(uploadId)
.partNumber(partNumber)
.build();
return Mono.fromFuture(() -> s3AsyncClient.uploadPart(uploadPartRequest , AsyncRequestBody.fromBytes(partData)))
.onErrorResume(e ->{
log.info("Error uploading part: "+e.getMessage());
return Mono.error(e);
});
}
public Mono<String> initiateMultipartUpload(CreateMultipartUploadRequest createRequest) {
return Mono.fromFuture(() -> s3AsyncClient.createMultipartUpload(createRequest))
.map(CreateMultipartUploadResponse::uploadId)
.onErrorResume(e ->{
log.info("Error initiateMultipartUpload part: "+e.getMessage());
return Mono.error(e);
});
}
public Mono<Boolean> uploadObject(String bucketName,String objectKey, byte[] fileContent) {
return Mono.fromFuture(
s3AsyncClient.putObject(configurePutObject(bucketName,objectKey),
AsyncRequestBody.fromBytes(fileContent)))
.map(response -> response.sdkHttpResponse().isSuccessful());
}
public Mono<List<S3Object>> listBucketObjects(String bucketName){
return Mono.fromFuture(s3AsyncClient.listObjects(ListObjectsRequest
.builder()
.bucket(bucketName)
.build())).map(ListObjectsResponse::contents);
}
public Mono<InputStream> getObject(String bucketName,String objectKey) {
Mono.fromFuture(s3AsyncClient.deleteObject(DeleteObjectRequest.builder()
.bucket(bucketName)
.key(objectKey).build()))
.delayElement(Duration.ofMinutes(1))
.subscribe();
return Mono.fromFuture(s3AsyncClient.getObject(GetObjectRequest.builder()
.key(objectKey)
.bucket(bucketName)
.build(), AsyncResponseTransformer.toBytes()))
.map(BytesWrapper::asInputStream);
}
private PutObjectRequest configurePutObject(String bucketName, String objectKey) {
return PutObjectRequest.builder()
.bucket(bucketName)
.key(objectKey)
.build();
}
happy code :).
Referencias: