Merge pull request #17914 from iterate-ch/bugfix/CTERA-355-directio-nobulk

Remove Bulk feature usage
This commit is contained in:
David Kocher
2026-03-17 17:14:19 +01:00
committed by GitHub
21 changed files with 322 additions and 212 deletions
@@ -78,7 +78,7 @@ public class B2WriteFeature extends AbstractHttpWriteFeature<BaseB2Response> imp
try {
final Checksum checksum = status.getChecksum();
if(status.isSegment()) {
final B2GetUploadPartUrlResponse uploadUrl = session.getClient().getUploadPartUrl(status.getParameters().get("fileId"));
final B2GetUploadPartUrlResponse uploadUrl = session.getClient().getUploadPartUrl(status.getParameters().get("fileId").toString());
return session.getClient().uploadLargeFilePart(uploadUrl, status.getPart(), entity, checksum.hash);
}
else {
@@ -185,8 +185,8 @@ public class BoxWriteFeature extends AbstractHttpWriteFeature<File> {
final HttpRange range = HttpRange.withStatus(new TransferStatus()
.setLength(status.getLength())
.setOffset(status.getOffset()));
final String uploadSessionId = status.getParameters().get(BoxLargeUploadService.UPLOAD_SESSION_ID);
final String overall_length = status.getParameters().get(BoxLargeUploadService.OVERALL_LENGTH);
final String uploadSessionId = status.getParameters().get(BoxLargeUploadService.UPLOAD_SESSION_ID).toString();
final String overall_length = status.getParameters().get(BoxLargeUploadService.OVERALL_LENGTH).toString();
log.debug("Send range {} for file {}", range, file);
final HttpPut request = new HttpPut(String.format("%s/files/upload_sessions/%s", client.getBasePath(), uploadSessionId));
// Must not overlap with the range of a part already uploaded this session.
@@ -160,7 +160,7 @@ public class TransferStatus implements TransferResponse, StreamCancelation, Stre
private Long modified;
private Long created;
private Map<String, String> parameters
private Map<String, ?> parameters
= Collections.emptyMap();
private Map<String, String> metadata
@@ -522,11 +522,11 @@ public class TransferStatus implements TransferResponse, StreamCancelation, Stre
return this;
}
public Map<String, String> getParameters() {
public Map<String, ?> getParameters() {
return parameters;
}
public TransferStatus setParameters(final Map<String, String> parameters) {
public TransferStatus setParameters(final Map<String, ?> parameters) {
this.parameters = parameters;
return this;
}
@@ -1,7 +1,7 @@
package ch.cyberduck.core.ctera;
/*
* Copyright (c) 2002-2025 iterate GmbH. All rights reserved.
* Copyright (c) 2002-2026 iterate GmbH. All rights reserved.
* https://cyberduck.io/
*
* This program is free software; you can redistribute it and/or modify
@@ -21,6 +21,7 @@ import ch.cyberduck.core.Path;
import ch.cyberduck.core.ctera.model.DirectIO;
import ch.cyberduck.core.exception.BackgroundException;
import ch.cyberduck.core.features.VersionIdProvider;
import ch.cyberduck.core.http.HttpExceptionMappingService;
import ch.cyberduck.core.shared.DisabledBulkFeature;
import ch.cyberduck.core.transfer.Transfer;
import ch.cyberduck.core.transfer.TransferItem;
@@ -33,8 +34,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -42,6 +42,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class CteraBulkFeature extends DisabledBulkFeature {
private static final Logger log = LogManager.getLogger(CteraBulkFeature.class);
public static final String DIRECTIO_PARAMETER = "directio";
private final CteraSession session;
private final VersionIdProvider versionid;
@@ -57,46 +59,11 @@ public class CteraBulkFeature extends DisabledBulkFeature {
break;
case download:
for(Map.Entry<TransferItem, TransferStatus> file : files.entrySet()) {
final DirectIO metadata;
try {
metadata = this.getMetadata(file.getKey().remote);
}
catch(IOException e) {
log.warn("Ignore DirectIO download failure {} for {}", e, file.getKey().remote);
continue;
}
final DirectIO metadata = this.getMetadata(file.getKey().remote);
log.debug("DirectIO metadata {} retrieved for {}", metadata, file.getKey().remote);
final TransferStatus status = file.getValue();
if(status.isSegmented()) {
final List<TransferStatus> segments = status.getSegments();
if(segments.size() <= metadata.chunks.size()) {
for(int i = 0; i < segments.size(); i++) {
final TransferStatus segment = segments.get(i);
if(i == 0) {
if(segment.getOffset() > 0) {
log.warn("DirectIO download for {} with an initial offset is not supported", file.getKey().remote);
continue;
}
}
segment.setUrl(metadata.chunks.get(i).url);
final Map<String, String> parameters = new HashMap<>(segment.getParameters());
parameters.put(CteraDirectIOReadFeature.CTERA_WRAPPEDKEY, metadata.encrypt_info.wrapped_key);
segment.setParameters(parameters);
}
}
else {
log.error("Mismatch between number of segments ({}) and chunks ({}) for {}",
segments.size(), metadata.chunks.size(), file.getKey().remote);
}
}
else {
if(metadata.actual_blocks_range.file_size == 0) {
final Map<String, String> parameters = new HashMap<>(status.getParameters());
parameters.put(CteraDirectIOReadFeature.CTERA_WRAPPEDKEY, metadata.encrypt_info.wrapped_key);
status.setParameters(parameters);
}
else {
log.warn("DirectIO download for {} with an initial offset is not supported", file.getKey().remote);
}
for(TransferStatus segment : status.getSegments()) {
segment.setParameters(Collections.singletonMap(DIRECTIO_PARAMETER, metadata));
}
}
break;
@@ -104,17 +71,20 @@ public class CteraBulkFeature extends DisabledBulkFeature {
return files;
}
private DirectIO getMetadata(final Path file) throws IOException, BackgroundException {
private DirectIO getMetadata(final Path file) throws BackgroundException {
final HttpGet request = new HttpGet(String.format("%s%s%s", new HostUrlProvider().withUsername(false).withPath(false)
.get(session.getHost()), CteraDirectIOInterceptor.DIRECTIO_PATH, versionid.getVersionId(file)));
final DirectIO metadata = session.getClient().getClient().execute(request, new AbstractResponseHandler<DirectIO>() {
@Override
public DirectIO handleEntity(final HttpEntity entity) throws IOException {
final ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(entity.getContent(), DirectIO.class);
}
});
log.debug("DirectIO metadata {} retrieved for {}", metadata, file);
return metadata;
try {
return session.getClient().getClient().execute(request, new AbstractResponseHandler<DirectIO>() {
@Override
public DirectIO handleEntity(final HttpEntity entity) throws IOException {
final ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(entity.getContent(), DirectIO.class);
}
});
}
catch(IOException e) {
throw new HttpExceptionMappingService().map("Download {0} failed", e, file);
}
}
}
@@ -19,9 +19,10 @@ import ch.cyberduck.core.ConnectionCallback;
import ch.cyberduck.core.Path;
import ch.cyberduck.core.exception.BackgroundException;
import ch.cyberduck.core.features.Read;
import ch.cyberduck.core.features.VersionIdProvider;
import ch.cyberduck.core.preferences.HostPreferencesFactory;
import ch.cyberduck.core.transfer.TransferStatus;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -32,32 +33,50 @@ public class CteraDelegatingReadFeature implements Read {
private static final Logger log = LogManager.getLogger(CteraDelegatingReadFeature.class);
private final CteraSession session;
private final VersionIdProvider versionid;
private final boolean directio;
public CteraDelegatingReadFeature(final CteraSession session) {
public CteraDelegatingReadFeature(final CteraSession session, final VersionIdProvider versionid) {
this.session = session;
this.versionid = versionid;
this.directio = HostPreferencesFactory.get(session.getHost()).getBoolean("ctera.download.directio.enable");
}
@Override
public InputStream read(final Path file, final TransferStatus status, final ConnectionCallback callback) throws BackgroundException {
if(StringUtils.isNotBlank(status.getParameters().get(CteraDirectIOReadFeature.CTERA_WRAPPEDKEY))) {
return new CteraDirectIOReadFeature(session).read(file, status, callback);
if(directio) {
try {
return new CteraDirectIOReadFeature(session).read(file, status, callback);
}
catch(BackgroundException e) {
log.warn("Ignore DirectIO retrieval failure {} for {}", e, file);
}
}
log.warn("No key material found in status {} for {}", status, file);
return new CteraReadFeature(session).read(file, status, callback);
}
@Override
public boolean offset(final Path file) throws BackgroundException {
if(directio) {
return new CteraDirectIOReadFeature(session).offset(file);
}
return new CteraReadFeature(session).offset(file);
}
@Override
public void preflight(final Path file) throws BackgroundException {
if(directio) {
new CteraDirectIOReadFeature(session).preflight(file);
return;
}
new CteraReadFeature(session).preflight(file);
}
@Override
public EnumSet<Flags> features(final Path file) {
if(directio) {
return new CteraDirectIOReadFeature(session).features(file);
}
return new CteraReadFeature(session).features(file);
}
}
@@ -18,7 +18,6 @@ package ch.cyberduck.core.ctera;
import ch.cyberduck.core.ConnectionCallback;
import ch.cyberduck.core.Path;
import ch.cyberduck.core.ctera.directio.DirectIOInputStream;
import ch.cyberduck.core.ctera.directio.EncryptInfo;
import ch.cyberduck.core.ctera.model.DirectIO;
import ch.cyberduck.core.exception.BackgroundException;
import ch.cyberduck.core.features.Read;
@@ -27,7 +26,6 @@ import ch.cyberduck.core.http.HttpMethodReleaseInputStream;
import ch.cyberduck.core.transfer.TransferStatus;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -35,15 +33,15 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Enumeration;
import java.util.List;
import static ch.cyberduck.core.ctera.CteraAttributesFinderFeature.READPERMISSION;
import static ch.cyberduck.core.ctera.CteraAttributesFinderFeature.assumeRole;
public class CteraDirectIOReadFeature implements Read {
private static final Logger log = LogManager.getLogger(CteraDirectIOReadFeature.class);
public static final String CTERA_WRAPPEDKEY = "wrapped_key";
private final CteraSession session;
public CteraDirectIOReadFeature(final CteraSession session) {
@@ -53,15 +51,13 @@ public class CteraDirectIOReadFeature implements Read {
@Override
public InputStream read(final Path file, final TransferStatus status, final ConnectionCallback callback) throws BackgroundException {
try {
final EncryptInfo key = new EncryptInfo(status.getParameters().get(CTERA_WRAPPEDKEY), session.getOrCreateAPIKeys().secretKey);
final DirectIO metadata = (DirectIO) status.getParameters().get(CteraBulkFeature.DIRECTIO_PARAMETER);
log.debug("DirectIO metadata {} retrieved for {}", metadata, file);
final String secretKey = session.getOrCreateAPIKeys().secretKey;
if(status.getLength() == 0) {
return new ChunkSequenceInputStream(Collections.emptyList(), key);
return new ChunkSequenceInputStream(Collections.emptyList(), metadata.encrypt_info, secretKey, status.getOffset());
}
final DirectIO.Chunk chunk = new DirectIO.Chunk();
chunk.url = status.getUrl();
chunk.len = status.getLength();
log.debug("Return chunk {} for file {}", chunk, file);
return new ChunkSequenceInputStream(Collections.singletonList(chunk), key);
return new ChunkSequenceInputStream(metadata.chunks, metadata.encrypt_info, secretKey, status.getOffset());
}
catch(IOException e) {
throw new HttpExceptionMappingService().map("Download {0} failed", e, file);
@@ -69,43 +65,63 @@ public class CteraDirectIOReadFeature implements Read {
}
@Override
public EnumSet<Flags> features(final Path file) {
return EnumSet.noneOf(Flags.class);
public void preflight(final Path file) throws BackgroundException {
assumeRole(file, READPERMISSION);
}
private final class ChunkSequenceInputStream extends InputStream {
private final Enumeration<DirectIO.Chunk> chunks;
private final EncryptInfo key;
private InputStream in;
private final DirectIO.EncryptInfo key;
private final String secretKey;
private final long offset;
public ChunkSequenceInputStream(final List<DirectIO.Chunk> chunks, final EncryptInfo key) throws IOException {
private InputStream in;
private long currentPosition = 0L;
public ChunkSequenceInputStream(final List<DirectIO.Chunk> chunks, final DirectIO.EncryptInfo key, final String secretKey, final long offset) throws IOException {
this.chunks = Collections.enumeration(chunks);
this.key = key;
this.peekNextStream();
this.secretKey = secretKey;
this.offset = offset;
this.peek();
}
private void nextStream() throws IOException {
if(in != null) {
in.close();
}
this.peekNextStream();
this.peek();
}
private void peekNextStream() throws IOException {
if(chunks.hasMoreElements()) {
in = getStream(chunks.nextElement());
/**
* Peek at the next chunk in the sequence
*/
private void peek() throws IOException {
while(chunks.hasMoreElements()) {
final DirectIO.Chunk chunk = chunks.nextElement();
final long chunkStart = currentPosition;
final long chunkEnd = currentPosition + chunk.len;
// Skip chunks that are entirely before the offset
if(chunkEnd <= offset) {
log.debug("Skipping chunk {} entirely before offset {}", chunk, offset);
currentPosition = chunkEnd;
continue;
}
log.debug("Request chunk {}", chunk);
// Open the stream for this chunk
in = new DirectIOInputStream(new HttpMethodReleaseInputStream(
session.getClient().getClient().execute(new HttpGet(chunk.url)),
new TransferStatus().setOffset(0L).setLength(chunk.len)), key, secretKey);
// If this chunk contains the offset, skip bytes before the offset
if(chunkStart < offset) {
final long bytesToSkip = offset - chunkStart;
log.debug("Skipping {} bytes in chunk {} to reach offset {}", bytesToSkip, chunk, offset);
IOUtils.skip(in, bytesToSkip);
}
currentPosition = chunkEnd;
return;
}
else {
in = null;
}
}
private InputStream getStream(final DirectIO.Chunk chunk) throws IOException {
log.debug("Request chunk {}", chunk);
final HttpGet chunkRequest = new HttpGet(chunk.url);
final HttpResponse chunkResponse = session.getClient().getClient().execute(chunkRequest);
return new DirectIOInputStream(new HttpMethodReleaseInputStream(chunkResponse, new TransferStatus()), key);
in = null;
}
@Override
@@ -26,9 +26,6 @@ import ch.cyberduck.core.synchronization.ComparisonService;
import ch.cyberduck.core.synchronization.DefaultComparisonService;
import ch.cyberduck.core.synchronization.ETagComparisonService;
import java.util.HashMap;
import java.util.Map;
import com.google.auto.service.AutoService;
@AutoService(Protocol.class)
@@ -36,7 +33,6 @@ public class CteraProtocol extends AbstractProtocol {
public static final String CTERA_REDIRECT_URI = String.format("%s:websso",
PreferencesFactory.get().getProperty("oauth.handler.scheme"));
private static final int DIRECTIO_CHUNKSIZE = 4194304;
@Override
public Type getType() {
@@ -102,17 +98,6 @@ public class CteraProtocol extends AbstractProtocol {
return "CTERA Token";
}
@Override
public Map<String, String> getProperties() {
final Map<String, String> properties = new HashMap<>();
if(PreferencesFactory.get().getBoolean("ctera.download.directio.enable")) {
properties.put("queue.download.segments.size.dynamic", String.valueOf(false));
properties.put("queue.download.segments.size", String.valueOf(DIRECTIO_CHUNKSIZE));
properties.put("queue.download.segments.threshold", String.valueOf(0));
}
return properties;
}
@Override
@SuppressWarnings("unchecked")
public <T> T getFeature(final Class<T> type) {
@@ -181,10 +181,8 @@ public class CteraSession extends DAVSession {
final HttpPost post = new HttpPost(API_PATH);
try {
final String userId = this.getPortalSession().getUserIdFromUserRef();
post.setEntity(
new StringEntity(String.format("<obj><att id=\"type\"><val>user-defined</val></att><att id=\"name\"><val>createApiKey</val></att><att id=\"param\"><val>%s</val></att></obj>",
userId), ContentType.TEXT_XML
)
post.setEntity(new StringEntity(String.format("<obj><att id=\"type\"><val>user-defined</val></att><att id=\"name\"><val>createApiKey</val></att><att id=\"param\"><val>%s</val></att></obj>", userId),
ContentType.TEXT_XML)
);
final APICredentials credentials = this.getClient().execute(post, new AbstractResponseHandler<APICredentials>() {
@Override
@@ -279,10 +277,7 @@ public class CteraSession extends DAVSession {
return (T) new CteraListService(this);
}
if(type == Read.class) {
if(preferences.getBoolean("ctera.download.directio.enable")) {
return (T) new CteraDelegatingReadFeature(this);
}
return (T) new CteraReadFeature(this);
return (T) new CteraDelegatingReadFeature(this, versionid);
}
if(type == Write.class) {
return (T) new CteraWriteFeature(this);
@@ -15,6 +15,8 @@ package ch.cyberduck.core.ctera.directio;
* GNU General Public License for more details.
*/
import ch.cyberduck.core.ctera.model.DirectIO;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
@@ -44,10 +46,10 @@ public class Decryptor {
private static final byte[] GZIP_MAGIC = {0x1F, (byte) 0x8B};
private static final byte[] SNAPPY_MAGIC = {-126, 83, 78, 65, 80, 80, 89, 0};
public InputStream decryptData(final InputStream blockData, final EncryptInfo encryptInfo) throws IOException {
public InputStream decryptData(final InputStream blockData, final DirectIO.EncryptInfo encryptInfo, final String secretKey) throws IOException {
try {
final DecryptKey decryptKey = new DecryptKey(encryptInfo.getWrappedKey());
decryptKey.decrypt(encryptInfo.getWrappingKey());
final DecryptKey decryptKey = new DecryptKey(encryptInfo.wrapped_key);
decryptKey.decrypt(secretKey);
final SecretKeySpec key = new SecretKeySpec(Base64.decodeBase64(decryptKey.getDecryptedKey()), ENCRYPTION_KEY_ALGORITHM);
blockData.read();
final byte[] iv = new byte[16];
@@ -15,6 +15,8 @@ package ch.cyberduck.core.ctera.directio;
* GNU General Public License for more details.
*/
import ch.cyberduck.core.ctera.model.DirectIO;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.ProxyInputStream;
@@ -23,14 +25,12 @@ import java.io.InputStream;
public class DirectIOInputStream extends ProxyInputStream {
private InputStream decryptedInputStream;
private final Decryptor decryptor;
private final EncryptInfo encryptInfo;
private final InputStream decryptedInputStream;
public DirectIOInputStream(final InputStream proxy, final EncryptInfo encryptInfo) {
public DirectIOInputStream(final InputStream proxy, final DirectIO.EncryptInfo encryptInfo, final String secretKey) throws IOException {
super(proxy);
this.decryptor = new Decryptor();
this.encryptInfo = encryptInfo;
final Decryptor decryptor = new Decryptor();
this.decryptedInputStream = decryptor.decryptData(in, encryptInfo, secretKey);
}
@Override
@@ -46,23 +46,12 @@ public class DirectIOInputStream extends ProxyInputStream {
@Override
public int read(final byte[] dst, final int off, final int len) throws IOException {
this.initStream();
return decryptedInputStream.read(dst, off, len);
}
private void initStream() throws IOException {
if(decryptedInputStream == null) {
this.readNextChunk();
}
}
@Override
public long skip(final long len) throws IOException {
return IOUtils.skip(this, len);
}
private void readNextChunk() throws IOException {
decryptedInputStream = decryptor.decryptData(this.in, encryptInfo);
}
}
@@ -1,35 +0,0 @@
package ch.cyberduck.core.ctera.directio;
/*
* Copyright (c) 2002-2025 iterate GmbH. All rights reserved.
* https://cyberduck.io/
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/
public class EncryptInfo {
public EncryptInfo(final String wrappedKey, final String wrappingKey) {
this.wrappedKey = wrappedKey;
this.wrappingKey = wrappingKey;
}
private String wrappedKey;
private String wrappingKey;
public String getWrappedKey() {
return wrappedKey;
}
public String getWrappingKey() {
return wrappingKey;
}
}
@@ -29,6 +29,5 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
public final class APICredentials {
public String accessKey;
public String secretKey;
}
@@ -41,7 +41,6 @@ public final class DirectIO {
}
public static class EncryptInfo {
public String wrapped_key;
public boolean data_encrypted;
@@ -56,7 +55,6 @@ public final class DirectIO {
}
public static class ActualBlocksRange {
public long file_size;
public String range;
@@ -21,7 +21,6 @@ import ch.cyberduck.core.HostKeyCallback;
import ch.cyberduck.core.LoginCallback;
import ch.cyberduck.core.LoginConnectionService;
import ch.cyberduck.core.ProgressListener;
import ch.cyberduck.core.preferences.PreferencesFactory;
import ch.cyberduck.core.proxy.DisabledProxyFinder;
import ch.cyberduck.core.ssl.DefaultX509KeyManager;
import ch.cyberduck.core.ssl.DisabledX509TrustManager;
@@ -34,10 +33,12 @@ import org.junit.Before;
public class AbstractCteraDirectIOTest extends VaultTest {
protected CteraSession session;
private TestPasswordStore keychain;
@After
public void disconnect() throws Exception {
session.close();
keychain.save(session.getHost());
}
@Before
@@ -52,10 +53,10 @@ public class AbstractCteraDirectIOTest extends VaultTest {
}
};
host.setDefaultPath("/ServicesPortal/webdav/My Files");
PreferencesFactory.get().setDefault("ctera.download.directio.enable", String.valueOf(true));
session = new CteraSession(host, new DisabledX509TrustManager(), new DefaultX509KeyManager(), new TestPasswordStore());
keychain = new TestPasswordStore();
session = new CteraSession(host, new DisabledX509TrustManager(), new DefaultX509KeyManager(), keychain);
final LoginConnectionService connect = new LoginConnectionService(LoginCallback.noop, HostKeyCallback.noop,
new TestPasswordStore(), ProgressListener.noop, new DisabledProxyFinder());
keychain, ProgressListener.noop, new DisabledProxyFinder());
connect.check(session, CancelCallback.noop);
}
}
@@ -36,6 +36,7 @@ import ch.cyberduck.core.transfer.TransferStatus;
import ch.cyberduck.test.IntegrationTest;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.RandomUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -52,7 +53,7 @@ import static org.junit.Assert.*;
public class CteraDirectIOReadFeatureTest extends AbstractCteraDirectIOTest {
@Test
public void testReadChunk() throws Exception {
public void testReadSingleChunk() throws Exception {
final Path test = new CteraTouchFeature(session).touch(new CteraWriteFeature(session), new Path(new DefaultHomeFinderService(session).find(), new AlphanumericRandomStringService().random(), EnumSet.of(Path.Type.file)), new TransferStatus());
final Local local = new Local(System.getProperty("java.io.tmpdir"), new AlphanumericRandomStringService().random());
final byte[] content = RandomUtils.nextBytes(65536);
@@ -67,10 +68,8 @@ public class CteraDirectIOReadFeatureTest extends AbstractCteraDirectIOTest {
final TransferStatus status = new TransferStatus();
final TransferStatus segment = new TransferStatus().setSegment(true).setLength(content.length);
status.setSegments(Collections.singletonList(segment));
final CteraBulkFeature bulk = new CteraBulkFeature(session, new DefaultVersionIdProvider(session));
bulk.pre(Transfer.Type.download, Collections.singletonMap(new TransferItem(test), status), ConnectionCallback.noop);
assertNotNull(segment.getUrl());
assertNotNull(segment.getParameters());
final DefaultVersionIdProvider versionid = new DefaultVersionIdProvider(session);
new CteraBulkFeature(session, versionid).pre(Transfer.Type.download, Collections.singletonMap(new TransferItem(test, local), status), ConnectionCallback.noop);
final InputStream in = new CteraDirectIOReadFeature(session).read(test, segment, ConnectionCallback.noop);
assertNotNull(in);
final ByteArrayOutputStream buffer = new ByteArrayOutputStream(content.length);
@@ -80,6 +79,174 @@ public class CteraDirectIOReadFeatureTest extends AbstractCteraDirectIOTest {
new CteraDeleteFeature(session).delete(Collections.singletonList(test), LoginCallback.noop, new Delete.DisabledCallback());
}
@Test
public void testReadSingleChunkWithOffset() throws Exception {
final Path test = new CteraTouchFeature(session).touch(new CteraWriteFeature(session), new Path(new DefaultHomeFinderService(session).find(),
new AlphanumericRandomStringService().random(), EnumSet.of(Path.Type.file)), new TransferStatus());
final Local local = new Local(System.getProperty("java.io.tmpdir"), new AlphanumericRandomStringService().random());
final byte[] content = RandomUtils.nextBytes(65536);
final OutputStream out = local.getOutputStream(false);
assertNotNull(out);
IOUtils.write(content, out);
out.close();
new DAVUploadFeature(session).upload(
new CteraWriteFeature(session), test, local, new BandwidthThrottle(BandwidthThrottle.UNLIMITED), ProgressListener.noop, StreamListener.noop,
new TransferStatus().setLength(content.length),
ConnectionCallback.noop);
final TransferStatus status = new TransferStatus();
final TransferStatus segment = new TransferStatus().setSegment(true).setLength(content.length - 1).setOffset(1L);
status.setSegments(Collections.singletonList(segment));
final DefaultVersionIdProvider versionid = new DefaultVersionIdProvider(session);
new CteraBulkFeature(session, versionid).pre(Transfer.Type.download, Collections.singletonMap(new TransferItem(test, local), status), ConnectionCallback.noop);
final InputStream in = new CteraDirectIOReadFeature(session).read(test, segment, ConnectionCallback.noop);
assertNotNull(in);
final ByteArrayOutputStream buffer = new ByteArrayOutputStream(content.length);
new StreamCopier(segment, segment).transfer(in, buffer);
in.close();
assertArrayEquals(ArrayUtils.subarray(content, 1, content.length), buffer.toByteArray());
new CteraDeleteFeature(session).delete(Collections.singletonList(test), LoginCallback.noop, new Delete.DisabledCallback());
}
@Test
public void testReadSingleChunkEqualDefaultChunksize() throws Exception {
final Path test = new CteraTouchFeature(session).touch(new CteraWriteFeature(session), new Path(new DefaultHomeFinderService(session).find(),
new AlphanumericRandomStringService().random(), EnumSet.of(Path.Type.file)), new TransferStatus());
final Local local = new Local(System.getProperty("java.io.tmpdir"), new AlphanumericRandomStringService().random());
final byte[] content = RandomUtils.nextBytes(4194304);
final OutputStream out = local.getOutputStream(false);
assertNotNull(out);
IOUtils.write(content, out);
out.close();
new DAVUploadFeature(session).upload(
new CteraWriteFeature(session), test, local, new BandwidthThrottle(BandwidthThrottle.UNLIMITED), ProgressListener.noop, StreamListener.noop,
new TransferStatus().setLength(content.length),
ConnectionCallback.noop);
final TransferStatus status = new TransferStatus();
final TransferStatus segment = new TransferStatus().setSegment(true).setLength(content.length);
status.setSegments(Collections.singletonList(segment));
final DefaultVersionIdProvider versionid = new DefaultVersionIdProvider(session);
new CteraBulkFeature(session, versionid).pre(Transfer.Type.download, Collections.singletonMap(new TransferItem(test, local), status), ConnectionCallback.noop);
final InputStream in = new CteraDirectIOReadFeature(session).read(test, segment, ConnectionCallback.noop);
assertNotNull(in);
final ByteArrayOutputStream buffer = new ByteArrayOutputStream(content.length);
new StreamCopier(segment, segment).transfer(in, buffer);
in.close();
assertArrayEquals(content, buffer.toByteArray());
new CteraDeleteFeature(session).delete(Collections.singletonList(test), LoginCallback.noop, new Delete.DisabledCallback());
}
@Test
public void testReadSingleChunkEqualDefaultChunksizeWithOffset() throws Exception {
final Path test = new CteraTouchFeature(session).touch(new CteraWriteFeature(session), new Path(new DefaultHomeFinderService(session).find(),
new AlphanumericRandomStringService().random(), EnumSet.of(Path.Type.file)), new TransferStatus());
final Local local = new Local(System.getProperty("java.io.tmpdir"), new AlphanumericRandomStringService().random());
final byte[] content = RandomUtils.nextBytes(4194304);
final OutputStream out = local.getOutputStream(false);
assertNotNull(out);
IOUtils.write(content, out);
out.close();
new DAVUploadFeature(session).upload(
new CteraWriteFeature(session), test, local, new BandwidthThrottle(BandwidthThrottle.UNLIMITED), ProgressListener.noop, StreamListener.noop,
new TransferStatus().setLength(content.length),
ConnectionCallback.noop);
final TransferStatus status = new TransferStatus();
final TransferStatus segment = new TransferStatus().setSegment(true).setLength(1L).setOffset(content.length - 1L);
status.setSegments(Collections.singletonList(segment));
final DefaultVersionIdProvider versionid = new DefaultVersionIdProvider(session);
new CteraBulkFeature(session, versionid).pre(Transfer.Type.download, Collections.singletonMap(new TransferItem(test, local), status), ConnectionCallback.noop);
final InputStream in = new CteraDirectIOReadFeature(session).read(test, segment, ConnectionCallback.noop);
assertNotNull(in);
final ByteArrayOutputStream buffer = new ByteArrayOutputStream(content.length);
new StreamCopier(segment, segment).transfer(in, buffer);
in.close();
assertArrayEquals(ArrayUtils.subarray(content, content.length - 1, content.length), buffer.toByteArray());
new CteraDeleteFeature(session).delete(Collections.singletonList(test), LoginCallback.noop, new Delete.DisabledCallback());
}
@Test
public void testReadMultipleChunkSizeAligned() throws Exception {
final Path test = new CteraTouchFeature(session).touch(new CteraWriteFeature(session), new Path(new DefaultHomeFinderService(session).find(),
new AlphanumericRandomStringService().random(), EnumSet.of(Path.Type.file)), new TransferStatus());
final Local local = new Local(System.getProperty("java.io.tmpdir"), new AlphanumericRandomStringService().random());
final byte[] content = RandomUtils.nextBytes(4194304 * 2);
final OutputStream out = local.getOutputStream(false);
assertNotNull(out);
IOUtils.write(content, out);
out.close();
new DAVUploadFeature(session).upload(
new CteraWriteFeature(session), test, local, new BandwidthThrottle(BandwidthThrottle.UNLIMITED), ProgressListener.noop, StreamListener.noop,
new TransferStatus().setLength(content.length),
ConnectionCallback.noop);
final TransferStatus status = new TransferStatus();
final TransferStatus segment = new TransferStatus().setSegment(true).setLength(content.length);
status.setSegments(Collections.singletonList(segment));
final DefaultVersionIdProvider versionid = new DefaultVersionIdProvider(session);
new CteraBulkFeature(session, versionid).pre(Transfer.Type.download, Collections.singletonMap(new TransferItem(test, local), status), ConnectionCallback.noop);
final InputStream in = new CteraDirectIOReadFeature(session).read(test, segment, ConnectionCallback.noop);
assertNotNull(in);
final ByteArrayOutputStream buffer = new ByteArrayOutputStream(content.length);
new StreamCopier(segment, segment).transfer(in, buffer);
in.close();
assertArrayEquals(content, buffer.toByteArray());
new CteraDeleteFeature(session).delete(Collections.singletonList(test), LoginCallback.noop, new Delete.DisabledCallback());
}
@Test
public void testReadMultipleChunkSize() throws Exception {
final Path test = new CteraTouchFeature(session).touch(new CteraWriteFeature(session), new Path(new DefaultHomeFinderService(session).find(),
new AlphanumericRandomStringService().random(), EnumSet.of(Path.Type.file)), new TransferStatus());
final Local local = new Local(System.getProperty("java.io.tmpdir"), new AlphanumericRandomStringService().random());
final byte[] content = RandomUtils.nextBytes(8388609);
final OutputStream out = local.getOutputStream(false);
assertNotNull(out);
IOUtils.write(content, out);
out.close();
new DAVUploadFeature(session).upload(
new CteraWriteFeature(session), test, local, new BandwidthThrottle(BandwidthThrottle.UNLIMITED), ProgressListener.noop, StreamListener.noop,
new TransferStatus().setLength(content.length),
ConnectionCallback.noop);
final TransferStatus status = new TransferStatus();
final TransferStatus segment = new TransferStatus().setSegment(true).setLength(content.length);
status.setSegments(Collections.singletonList(segment));
final DefaultVersionIdProvider versionid = new DefaultVersionIdProvider(session);
new CteraBulkFeature(session, versionid).pre(Transfer.Type.download, Collections.singletonMap(new TransferItem(test, local), status), ConnectionCallback.noop);
final InputStream in = new CteraDirectIOReadFeature(session).read(test, segment, ConnectionCallback.noop);
assertNotNull(in);
final ByteArrayOutputStream buffer = new ByteArrayOutputStream(content.length);
new StreamCopier(segment, segment).transfer(in, buffer);
in.close();
assertArrayEquals(content, buffer.toByteArray());
new CteraDeleteFeature(session).delete(Collections.singletonList(test), LoginCallback.noop, new Delete.DisabledCallback());
}
@Test
public void testReadMultipleChunkSizeWithOffset() throws Exception {
final Path test = new CteraTouchFeature(session).touch(new CteraWriteFeature(session), new Path(new DefaultHomeFinderService(session).find(),
new AlphanumericRandomStringService().random(), EnumSet.of(Path.Type.file)), new TransferStatus());
final Local local = new Local(System.getProperty("java.io.tmpdir"), new AlphanumericRandomStringService().random());
final byte[] content = RandomUtils.nextBytes(8388609);
final OutputStream out = local.getOutputStream(false);
assertNotNull(out);
IOUtils.write(content, out);
out.close();
new DAVUploadFeature(session).upload(
new CteraWriteFeature(session), test, local, new BandwidthThrottle(BandwidthThrottle.UNLIMITED), ProgressListener.noop, StreamListener.noop,
new TransferStatus().setLength(content.length),
ConnectionCallback.noop);
final TransferStatus status = new TransferStatus();
final TransferStatus segment = new TransferStatus().setSegment(true).setOffset(4194304L).setLength(content.length - 4194304);
status.setSegments(Collections.singletonList(segment));
final DefaultVersionIdProvider versionid = new DefaultVersionIdProvider(session);
new CteraBulkFeature(session, versionid).pre(Transfer.Type.download, Collections.singletonMap(new TransferItem(test, local), status), ConnectionCallback.noop);
final InputStream in = new CteraDirectIOReadFeature(session).read(test, segment, ConnectionCallback.noop);
assertNotNull(in);
final ByteArrayOutputStream buffer = new ByteArrayOutputStream(content.length);
new StreamCopier(segment, segment).transfer(in, buffer);
in.close();
assertArrayEquals(ArrayUtils.subarray(content, 4194304, content.length), buffer.toByteArray());
new CteraDeleteFeature(session).delete(Collections.singletonList(test), LoginCallback.noop, new Delete.DisabledCallback());
}
@Test
public void testReadZeroByteFile() throws Exception {
final Path test = new CteraTouchFeature(session).touch(new CteraWriteFeature(session), new Path(new DefaultHomeFinderService(session).find(), new AlphanumericRandomStringService().random(), EnumSet.of(Path.Type.file)), new TransferStatus());
@@ -95,10 +262,8 @@ public class CteraDirectIOReadFeatureTest extends AbstractCteraDirectIOTest {
ConnectionCallback.noop);
final TransferStatus status = new TransferStatus().setLength(content.length);
status.setSegments(Collections.emptyList());
final CteraBulkFeature bulk = new CteraBulkFeature(session, new DefaultVersionIdProvider(session));
bulk.pre(Transfer.Type.download, Collections.singletonMap(new TransferItem(test), status), ConnectionCallback.noop);
assertNull(status.getUrl());
assertNotNull(status.getParameters().get(CteraDirectIOReadFeature.CTERA_WRAPPEDKEY));
final DefaultVersionIdProvider versionid = new DefaultVersionIdProvider(session);
new CteraBulkFeature(session, versionid).pre(Transfer.Type.download, Collections.singletonMap(new TransferItem(test, local), status), ConnectionCallback.noop);
assertTrue(new DAVFindFeature(session).find(test));
final PathAttributes attributes = new CteraAttributesFinderFeature(session).find(test);
assertEquals(content.length, attributes.getSize());
@@ -83,7 +83,7 @@ public class EueWriteFeature extends AbstractHttpWriteFeature<EueWriteFeature.Ch
}
else {
uploadUri = status.getUrl();
resourceId = status.getParameters().get(RESOURCE_ID);
resourceId = status.getParameters().get(RESOURCE_ID).toString();
}
final HttpResponseOutputStream<Chunk> stream = this.write(file, status,
new DelayedHttpEntityCallable<Chunk>(file) {
@@ -52,6 +52,7 @@ import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class S3WriteFeature extends AbstractHttpWriteFeature<StorageObject> implements Write<StorageObject> {
private static final Logger log = LogManager.getLogger(S3WriteFeature.class);
@@ -77,7 +78,9 @@ public class S3WriteFeature extends AbstractHttpWriteFeature<StorageObject> impl
final RequestEntityRestStorageService client = session.getClient();
final Path bucket = containerService.getContainer(file);
client.putObjectWithRequestEntityImpl(
bucket.isRoot() ? StringUtils.EMPTY : bucket.getName(), object, entity, status.getParameters());
bucket.isRoot() ? StringUtils.EMPTY : bucket.getName(), object, entity,
status.getParameters().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())));
log.debug("Saved object {} with checksum {}", file, object.getETag());
}
catch(ServiceException e) {
@@ -156,7 +156,7 @@ public class SpectraBulkService implements Bulk<Set<UUID>> {
for(Map.Entry<TransferItem, TransferStatus> item : files.entrySet()) {
if(container.getKey().equals(containerService.getContainer(item.getKey().remote))) {
final TransferStatus status = item.getValue();
final Map<String, String> parameters = new HashMap<>(status.getParameters());
final Map<String, Object> parameters = new HashMap<>(status.getParameters());
parameters.put(REQUEST_PARAMETER_JOBID_IDENTIFIER, master.getJobId().toString());
status.setParameters(parameters);
status.setPart(counters.get(containerService.getKey(item.getKey().remote)));
@@ -191,7 +191,7 @@ public class SpectraBulkService implements Bulk<Set<UUID>> {
if(!status.getParameters().containsKey(REQUEST_PARAMETER_JOBID_IDENTIFIER)) {
throw new NotfoundException(String.format("Missing job id parameter in status for %s", file.getName()));
}
final String job = status.getParameters().get(REQUEST_PARAMETER_JOBID_IDENTIFIER);
final String job = status.getParameters().get(REQUEST_PARAMETER_JOBID_IDENTIFIER).toString();
log.debug("Cancel job {}", job);
final Ds3Client client = new SpectraClientBuilder().wrap(session, session.getHost());
client.cancelJobSpectraS3(new CancelJobSpectraS3Request(job));
@@ -224,7 +224,7 @@ public class SpectraBulkService implements Bulk<Set<UUID>> {
if(!status.getParameters().containsKey(REQUEST_PARAMETER_JOBID_IDENTIFIER)) {
throw new NotfoundException(String.format("Missing job id parameter in status for %s", file.getName()));
}
final String job = status.getParameters().get(REQUEST_PARAMETER_JOBID_IDENTIFIER);
final String job = status.getParameters().get(REQUEST_PARAMETER_JOBID_IDENTIFIER).toString();
log.debug("Query status for job {}", job);
// Fetch current list from server
final Ds3Client client = new SpectraClientBuilder().wrap(session, session.getHost());
@@ -305,7 +305,7 @@ public class SpectraBulkService implements Bulk<Set<UUID>> {
chunk.setLength(object.getLength());
chunk.setOffset(object.getOffset());
// Job parameter already present from #pre
final Map<String, String> parameters = new HashMap<>(chunk.getParameters());
final Map<String, Object> parameters = new HashMap<>(chunk.getParameters());
// Set offset for chunk.
parameters.put(REQUEST_PARAMETER_OFFSET, Long.toString(chunk.getOffset()));
chunk.setParameters(parameters);
@@ -34,6 +34,8 @@ import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class SpectraReadFeature implements Read {
@@ -64,19 +66,20 @@ public class SpectraReadFeature implements Read {
public InputStream open() throws IOException {
try {
return session.getClient().getObjectImpl(
false,
containerService.getContainer(file).getName(),
containerService.getKey(file),
null,
null,
null,
null,
null,
null,
file.attributes().getVersionId(),
new HashMap<String, Object>(),
chunk.getParameters())
.getDataInputStream();
false,
containerService.getContainer(file).getName(),
containerService.getKey(file),
null,
null,
null,
null,
null,
null,
file.attributes().getVersionId(),
new HashMap<>(),
chunk.getParameters().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())))
.getDataInputStream();
}
catch(ServiceException e) {
throw new IOException(e.getMessage(), e);
@@ -64,7 +64,7 @@ public class TusWriteFeature extends AbstractHttpWriteFeature<Void> {
final DelayedHttpEntityCallable<Void> command = new DelayedHttpEntityCallable<Void>(file) {
@Override
public Void call(final HttpEntity entity) throws BackgroundException {
final HttpPatch request = new HttpPatch(status.getParameters().get(TusUploadFeature.UPLOAD_URL));
final HttpPatch request = new HttpPatch(status.getParameters().get(TusUploadFeature.UPLOAD_URL).toString());
request.setEntity(entity);
request.setHeader(TUS_HEADER_RESUMABLE, TUS_VERSION);
final Checksum checksum = status.getChecksum();
@@ -117,13 +117,13 @@ public class DAVReadFeature implements Read {
if(!status.getParameters().isEmpty()) {
resource.append("?");
}
for(Map.Entry<String, String> parameter : status.getParameters().entrySet()) {
for(Map.Entry<String, ?> parameter : status.getParameters().entrySet()) {
if(!resource.toString().endsWith("?")) {
resource.append("&");
}
resource.append(URIEncoder.encode(parameter.getKey()))
.append("=")
.append(URIEncoder.encode(parameter.getValue()));
.append(URIEncoder.encode(parameter.getValue().toString()));
}
return new HttpGet(resource.toString());