Skip to content
This repository was archived by the owner on Jun 7, 2023. It is now read-only.

Commit ca9b940

Browse files
author
Rustam Aliyev
committedApr 8, 2013
Use C* as a blob storage for small emails. Major refactoring of blob
storage. fixed #32
1 parent 583a6da commit ca9b940

34 files changed

+1300
-248
lines changed
 

‎config/elasticinbox.cml

+6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ CREATE COLUMN FAMILY MessageMetadata WITH
2626
caching = keys_only AND
2727
comment='Message metadata including headers, labels, markers, physical location, etc.';
2828

29+
CREATE COLUMN FAMILY MessageBlob WITH
30+
key_validation_class = 'CompositeType(TimeUUIDType, Int32Type)' AND
31+
comparator = Int32Type AND
32+
caching = keys_only AND
33+
comment='Chunked message blobs';
34+
2935
CREATE COLUMN FAMILY IndexLabels WITH
3036
key_validation_class = UTF8Type AND
3137
comparator = TimeUUIDType AND

‎config/elasticinbox.yaml

+12-5
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ lmtp_enable_pop3: true
2626
pop3_port: 2110
2727
pop3_max_connections: 20
2828

29-
### Metadata storage driver
30-
# currently only "cassandra" supported
31-
metadata_storage_driver: cassandra
29+
### Database settings
30+
# Database used for storing metadata of accounts, labels and messages.
31+
# Optionally, it is possible to store blobs in the database (see database_blob_max_threshold).
32+
# Currently only "cassandra" is supported.
33+
database_driver: cassandra
3234

3335
# If you want to store parsed HTML and/or PLAIN text of the message body
3436
# in the metadata storage, enable below.
@@ -38,6 +40,11 @@ metadata_storage_driver: cassandra
3840
store_html_message: true
3941
store_plain_message: false
4042

43+
# Maximum blob size in bytes which can be stored in the database.
44+
# Blobs larger than this value will be stored with the deafult blob profile (blobstore_write_profile).
45+
# Set to 0 to disable using database for blobs. Currently value can't be more than 128K.
46+
database_blob_max_size: 65536
47+
4148
### Cassandra settings
4249
# Specify Cassandra hosts (multiple for LB), cluster name, keyspace
4350
# and auto discovery
@@ -85,10 +92,10 @@ blobstore_profiles:
8592
# only one profile can be used for writing at the same time
8693
blobstore_write_profile: fs-local
8794

88-
# compress objects written to the blob store
95+
# compress objects written to the blob store (including database blobs)
8996
blobstore_enable_compression: true
9097

91-
# encrypt objects written to the blob store
98+
# encrypt objects written to the blob store (excluding database blobs)
9299
blobstore_enable_encryption: false
93100
#blobstore_default_encryption_key: mykey1
94101

‎itests/src/test/java/com/elasticinbox/itests/AbstractIntegrationTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import static org.ops4j.pax.exam.CoreOptions.felix;
55
import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
66
import static org.ops4j.pax.exam.CoreOptions.options;
7-
import static org.ops4j.pax.exam.CoreOptions.repository;
87
import static org.ops4j.pax.exam.CoreOptions.scanDir;
98
import static org.ops4j.pax.exam.CoreOptions.systemProperty;
109
import static org.ops4j.pax.exam.CoreOptions.workingDirectory;
@@ -46,7 +45,7 @@ public Option[] config()
4645
//junitBundles(),
4746
felix().version("3.2.2"),
4847
workingDirectory("target/paxrunner/"),
49-
repository("https://repository.apache.org/snapshots/").allowSnapshots(),
48+
//repository("https://repository.apache.org/snapshots/").allowSnapshots(),
5049

5150
// Configs
5251
systemProperty("elasticinbox.config").value("../test-classes/elasticinbox.yaml"),

‎itests/src/test/java/com/elasticinbox/itests/Pop3IT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public Option[] config()
9494
public void testListUidl() throws IOException
9595
{
9696
initAccount();
97-
97+
9898
// load messages with POP3 label
9999
long mailSizeRegular = getResourceSize(EMAIL_REGULAR);
100100
long mailSizeAttach = getResourceSize(EMAIL_LARGE_ATT);

‎itests/src/test/resources/elasticinbox.yaml

+10-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ lmtp_enable_pop3: true
2626
pop3_port: 2110
2727
pop3_max_connections: 20
2828

29-
### Metadata storage driver
30-
# currently only "cassandra" supported
31-
metadata_storage_driver: cassandra
29+
### Database settings
30+
# Database used for storing metadata of accounts, labels and messages.
31+
# Optionally, it is possible to store blobs in the database (see database_blob_max_threshold).
32+
# Currently only "cassandra" is supported.
33+
database_driver: cassandra
3234

3335
# If you want to store parsed HTML and/or PLAIN text of the message body
3436
# in the metadata storage, enable below.
@@ -38,6 +40,11 @@ metadata_storage_driver: cassandra
3840
store_html_message: true
3941
store_plain_message: false
4042

43+
# Maximum blob size in bytes which can be stored in the database.
44+
# Blobs larger than this value will be stored with the deafult blob profile (blobstore_write_profile).
45+
# Set to 0 to disable using database for blobs. Currently value can't be more than 128K.
46+
database_blob_max_size: 65536
47+
4148
### Cassandra settings
4249
# Specify Cassandra hosts (multiple for LB), cluster name, keyspace
4350
# and auto discovery

‎modules/config/src/main/java/com/elasticinbox/config/Config.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,11 @@ public class Config
5353
public Integer pop3_port;
5454
public Integer pop3_max_connections;
5555

56-
// metadata storage settings
57-
public String metadata_storage_driver;
56+
// Database settings
57+
public String database_driver;
5858
public Boolean store_html_message;
5959
public Boolean store_plain_message;
60+
public Long database_blob_max_size;
6061

6162
// Cassandra settings
6263
public List<String> cassandra_hosts;

‎modules/config/src/main/java/com/elasticinbox/config/Configurator.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,18 @@ static URI getStorageConfigURL() throws ConfigurationException
112112
// TODO: add config verification here
113113
// ...
114114

115+
// verify max database blob size
116+
if(conf.database_blob_max_size > DatabaseConstants.MAX_BLOB_SIZE) {
117+
throw new ConfigurationException("Blobs larger than "
118+
+ DatabaseConstants.MAX_BLOB_SIZE + " bytes cannot be stored in the database");
119+
}
120+
121+
// verify that blobstore profile name is not conflicting with internal name
122+
if (conf.blobstore_profiles.containsKey(DatabaseConstants.DATABASE_PROFILE)) {
123+
throw new ConfigurationException("BlobStore profile name cannot be '"
124+
+ DatabaseConstants.DATABASE_PROFILE + "'");
125+
}
126+
115127
// verify that default blobstore profile exists
116128
if (!conf.blobstore_profiles.containsKey(conf.blobstore_write_profile)) {
117129
throw new ConfigurationException("Default BlobStore Profile '"
@@ -167,8 +179,12 @@ public static Integer getPop3MaxConnections() {
167179
return conf.pop3_max_connections;
168180
}
169181

170-
public static String getMetadataStorageDriver() {
171-
return (conf.metadata_storage_driver.equalsIgnoreCase("cassandra")) ? "CASSANDRA" : "UNKNOWN";
182+
public static String getDatabaseDriver() {
183+
return (conf.database_driver.equalsIgnoreCase("cassandra")) ? "CASSANDRA" : "UNKNOWN";
184+
}
185+
186+
public static Long getDatabaseBlobMaxSize() {
187+
return conf.database_blob_max_size;
172188
}
173189

174190
public static List<String> getCassandraHosts() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Copyright (c) 2011-2013 Optimax Software Ltd.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice,
9+
* this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above copyright notice,
11+
* this list of conditions and the following disclaimer in the documentation
12+
* and/or other materials provided with the distribution.
13+
* * Neither the name of Optimax Software, ElasticInbox, nor the names
14+
* of its contributors may be used to endorse or promote products derived
15+
* from this software without specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
21+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24+
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
package com.elasticinbox.config;
30+
31+
public class DatabaseConstants
32+
{
33+
/** Maximum blob size allowed in database. 128KB */
34+
public static long MAX_BLOB_SIZE = 0x20000;
35+
36+
/** Maximum blob block size. 128KB */
37+
public static long BLOB_BLOCK_SIZE = 0x20000;
38+
39+
/** Reserved profile name for internal database storage (e.g. Cassandra) */
40+
public static final String DATABASE_PROFILE = "db";
41+
}

‎modules/core/osgi.bnd

+1-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ Export-Package: \
1313
com.elasticinbox.core.log
1414

1515
Private-Package: \
16-
com.elasticinbox.core.blob.store,\
17-
com.elasticinbox.core.blob.naming,\
16+
com.elasticinbox.core.blob.*,\
1817
com.elasticinbox.core.cassandra,\
1918
com.elasticinbox.core.cassandra.persistence,\
2019
com.elasticinbox.core.cassandra.utils

‎modules/core/src/main/java/com/elasticinbox/core/MessageDAO.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,9 @@ public void removeLabel(Mailbox mailbox, Set<Integer> labelIds, UUID messageId)
224224
*
225225
* @param mailbox
226226
* @param age
227+
* @throws IOException
227228
*/
228-
public void purge(Mailbox mailbox, Date age);
229+
public void purge(Mailbox mailbox, Date age) throws IOException;
229230

230231
/**
231232
* Calculates counters for all labels bu scanning through all messages. Used

‎modules/core/src/main/java/com/elasticinbox/core/blob/BlobDataSource.java

+13-12
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
import java.io.InputStream;
3333
import java.net.URI;
3434

35-
import com.elasticinbox.common.utils.Assert;
35+
import com.elasticinbox.core.blob.compression.CompressionHandler;
36+
import com.elasticinbox.core.blob.compression.DeflateCompressionHandler;
3637
import com.elasticinbox.core.blob.store.BlobStoreConstants;
37-
import com.elasticinbox.core.blob.store.CompressionHandler;
3838

3939
/**
4040
* This class builds Blob data source from the given URI. It provides methods
@@ -45,17 +45,13 @@
4545
public class BlobDataSource
4646
{
4747
private final InputStream in;
48-
private final Boolean compressed;
49-
private final URI blobUri;
48+
private final BlobURI blobUri;
5049
private final CompressionHandler compressionHandler;
5150

5251
public BlobDataSource(final URI uri, final InputStream in, final CompressionHandler ch)
5352
{
54-
Assert.notNull(uri.getPath(), "Invalid blob URI provided, missing URI path: " + uri.toString());
55-
56-
this.blobUri = uri;
53+
this.blobUri = new BlobURI().fromURI(uri);
5754
this.in = in;
58-
this.compressed = uri.getPath().endsWith(BlobStoreConstants.COMPRESS_SUFFIX);
5955
this.compressionHandler = ch;
6056
}
6157

@@ -65,8 +61,13 @@ public BlobDataSource(final URI uri, final InputStream in, final CompressionHand
6561
*
6662
* @return
6763
*/
68-
public boolean isCompressed() {
69-
return compressed;
64+
public boolean isCompressed()
65+
{
66+
return ((blobUri.getCompression() != null && blobUri.getCompression()
67+
.equals(DeflateCompressionHandler.COMPRESSION_TYPE_DEFLATE)) ||
68+
// TODO: deprecated suffix based compression detection
69+
// kept for backward compatibility with 0.3
70+
blobUri.getName().endsWith(BlobStoreConstants.COMPRESS_SUFFIX));
7071
}
7172

7273
/**
@@ -93,15 +94,15 @@ public InputStream getInputStream() throws IOException {
9394
*/
9495
public InputStream getUncompressedInputStream() throws IOException
9596
{
96-
if (compressed && this.compressionHandler != null) {
97+
if (this.isCompressed() && this.compressionHandler != null) {
9798
return this.compressionHandler.uncompress(in);
9899
} else {
99100
return in;
100101
}
101102
}
102103

103104
public String getName() {
104-
return BlobUtils.relativize(blobUri.getPath());
105+
return blobUri.getName();
105106
}
106107

107108
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
/**
2+
* Copyright (c) 2011-2013 Optimax Software Ltd.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice,
9+
* this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above copyright notice,
11+
* this list of conditions and the following disclaimer in the documentation
12+
* and/or other materials provided with the distribution.
13+
* * Neither the name of Optimax Software, ElasticInbox, nor the names
14+
* of its contributors may be used to endorse or promote products derived
15+
* from this software without specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
21+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24+
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
package com.elasticinbox.core.blob;
30+
31+
import static com.elasticinbox.core.blob.store.BlobStoreConstants.*;
32+
33+
import java.net.URI;
34+
import java.net.URISyntaxException;
35+
36+
import me.prettyprint.cassandra.utils.Assert;
37+
38+
import org.jclouds.http.Uris;
39+
import org.jclouds.http.Uris.UriBuilder;
40+
import org.jclouds.http.utils.Queries;
41+
42+
import com.google.common.collect.Multimap;
43+
44+
/**
45+
* This class provides Blob URI builder and parser tools.
46+
* <p>
47+
* Blobs URI contains following parts:
48+
* <ul>
49+
* <li>Schema - always "blob"</li>
50+
* <li>Host - blob storage profile name</li>
51+
* <li>Path - path where blob stored</li>
52+
* <li>Query parameters - various blob attributes such as compression algorithm,
53+
* encryption key, block count, etc.</li>
54+
* </ul>
55+
* <p>
56+
* Examples:
57+
* <p>
58+
* <code>blob://db/f1ca99e0-99a0-11e2-95f0-040cced3bd7a?c=dfl&b=1</code>
59+
* <p>
60+
* <code>blob://aws3-bucket/f1ca99e0-99a0-11e2-95f0-040cced3bd7a:myemail?c=dfl&e=ekey2</code>
61+
*
62+
* @author Rustam Aliyev
63+
*/
64+
public class BlobURI
65+
{
66+
private String profile;
67+
private String name;
68+
private String compression;
69+
private String encryptionKey;
70+
private Integer blockCount;
71+
72+
/**
73+
* Blob storage profile name.
74+
* @return
75+
*/
76+
public String getProfile() {
77+
return profile;
78+
}
79+
80+
/**
81+
* Blob name
82+
*
83+
* @return
84+
*/
85+
public String getName() {
86+
return BlobUtils.relativize(name);
87+
}
88+
89+
/**
90+
* Compression type (e.g. {@code "dfl"} for Deflate)
91+
*
92+
* @return
93+
*/
94+
public String getCompression() {
95+
return compression;
96+
}
97+
98+
/**
99+
* Encryption key name
100+
*
101+
* @return
102+
*/
103+
public String getEncryptionKey() {
104+
return encryptionKey;
105+
}
106+
107+
/**
108+
* Total count of blocks
109+
* @return
110+
*/
111+
public Integer getBlockCount() {
112+
return blockCount;
113+
}
114+
115+
/**
116+
* Blob storage profile name.
117+
* <p>
118+
* This parameter will be stored as a host of a URI.
119+
*
120+
* @param profile
121+
* @return
122+
*/
123+
public BlobURI setProfile(String profile) {
124+
this.profile = profile;
125+
return this;
126+
}
127+
128+
/**
129+
* Blob name
130+
* <p>
131+
* This parameter will be stored as a path of a URI.
132+
*
133+
* @param path
134+
* @return
135+
*/
136+
public BlobURI setName(String path)
137+
{
138+
// URI requires absolute path, add leading "/" if not already set
139+
this.name = (path.charAt(0) == '/') ? path : "/" + path;
140+
return this;
141+
}
142+
143+
/**
144+
* Compression type (e.g. {@code "dfl"} for Deflate)
145+
* <p>
146+
* This parameter will be stored in the query part of a URI.
147+
*
148+
* @param compressionType
149+
* @return
150+
*/
151+
public BlobURI setCompression(String compressionType) {
152+
this.compression = compressionType;
153+
return this;
154+
}
155+
156+
/**
157+
* Encryption key name which can was used for blob encryption.
158+
* <p>
159+
* This parameter will be stored in the query part of a URI.
160+
*
161+
* @param encryptionKey
162+
* @return
163+
*/
164+
public BlobURI setEncryptionKey(String encryptionKey) {
165+
this.encryptionKey = encryptionKey;
166+
return this;
167+
}
168+
169+
/**
170+
* Total number of blocks blob was split into.
171+
* <p>
172+
* This parameter will be stored in the query part of a URI.
173+
*
174+
* @param blockCount
175+
* @return
176+
*/
177+
public BlobURI setBlockCount(Integer blockCount) {
178+
this.blockCount = blockCount;
179+
return this;
180+
}
181+
182+
/**
183+
* Parse URI and populate fields
184+
*
185+
* @return
186+
*/
187+
public BlobURI fromURI(URI uri)
188+
{
189+
Assert.notNull(uri, "URI cannot be null");
190+
Assert.isTrue(uri.getScheme().equals(URI_SCHEME), "Invalid URI scheme specified for blob: " + uri.getScheme());
191+
Assert.notNull(uri.getHost(), "Invalid storage profile specified, unable to parse URI" + uri.toString());
192+
Assert.notNull(uri.getPath(), "Invalid blob name provided, unable to parse URI: " + uri.toString());
193+
194+
this.profile = uri.getHost();
195+
this.name = uri.getPath();
196+
197+
Multimap<String, String> queryParams = Queries.queryParser().apply(uri.getQuery());
198+
199+
if (queryParams.containsKey(URI_PARAM_ENCRYPTION_KEY)) {
200+
this.encryptionKey = queryParams.get(URI_PARAM_ENCRYPTION_KEY).toArray(new String[0])[0];
201+
}
202+
203+
if (queryParams.containsKey(URI_PARAM_COMPRESSION)) {
204+
this.compression = queryParams.get(URI_PARAM_COMPRESSION).toArray(new String[0])[0];
205+
}
206+
207+
if (queryParams.containsKey(URI_PARAM_BLOCK_COUNT)) {
208+
this.blockCount = Integer.parseInt(queryParams.get(URI_PARAM_BLOCK_COUNT).toArray(new String[0])[0]);
209+
}
210+
211+
return this;
212+
}
213+
214+
/**
215+
* Build blob {@link URI} from given parameters
216+
* <p>
217+
* Examples:
218+
* <p>
219+
* <code>blob://db/f1ca99e0-99a0-11e2-95f0-040cced3bd7a?c=dfl&b=1</code>
220+
* <p>
221+
* <code>blob://aws3-bucket/f1ca99e0-99a0-11e2-95f0-040cced3bd7a:myemail?c=dfl&e=ekey2</code>
222+
*/
223+
public URI buildURI()
224+
{
225+
Assert.notNull(this.profile, "Blob storage profile must be specified");
226+
Assert.notNull(this.name, "Blob name must be provided");
227+
228+
URI baseuri;
229+
230+
try {
231+
baseuri = new URI(URI_SCHEME, this.profile, this.name, null);
232+
} catch (URISyntaxException e) {
233+
throw new IllegalArgumentException("Invalid blob profile or path: ", e);
234+
}
235+
236+
UriBuilder ub = Uris.uriBuilder(baseuri);
237+
238+
if (this.compression != null) {
239+
ub.addQuery(URI_PARAM_COMPRESSION, this.compression);
240+
}
241+
242+
if (this.encryptionKey != null) {
243+
ub.addQuery(URI_PARAM_ENCRYPTION_KEY, this.encryptionKey);
244+
}
245+
246+
if (this.blockCount != null) {
247+
ub.addQuery(URI_PARAM_BLOCK_COUNT, Integer.toString(this.blockCount));
248+
}
249+
250+
return ub.build();
251+
}
252+
}

‎modules/core/src/main/java/com/elasticinbox/core/blob/BlobUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
public class BlobUtils
3232
{
3333
/**
34-
* Make absolute path relative by dropping first forward-slash
34+
* Convert absolute path to relative by dropping first forward-slash
3535
*
3636
* @param path
3737
* @return

‎modules/core/src/main/java/com/elasticinbox/core/blob/store/CompressionHandler.java ‎modules/core/src/main/java/com/elasticinbox/core/blob/compression/CompressionHandler.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2727
*/
2828

29-
package com.elasticinbox.core.blob.store;
29+
package com.elasticinbox.core.blob.compression;
3030

3131
import java.io.InputStream;
3232

@@ -52,4 +52,11 @@ public interface CompressionHandler
5252
* @return Uncompressed input stream
5353
*/
5454
public InputStream uncompress(InputStream in);
55+
56+
/**
57+
* Returns compression type
58+
*
59+
* @return
60+
*/
61+
public String getType();
5562
}

‎modules/core/src/main/java/com/elasticinbox/core/blob/store/DeflateCompressionHandler.java ‎modules/core/src/main/java/com/elasticinbox/core/blob/compression/DeflateCompressionHandler.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2727
*/
2828

29-
package com.elasticinbox.core.blob.store;
29+
package com.elasticinbox.core.blob.compression;
3030

3131
import java.io.InputStream;
3232
import java.util.zip.DeflaterInputStream;
@@ -49,11 +49,20 @@
4949
*/
5050
public class DeflateCompressionHandler implements CompressionHandler
5151
{
52+
public final static String COMPRESSION_TYPE_DEFLATE = "dfl";
53+
54+
@Override
5255
public InputStream compress(InputStream in) {
5356
return new DeflaterInputStream(in);
5457
}
5558

59+
@Override
5660
public InputStream uncompress(InputStream in) {
5761
return new InflaterInputStream(in);
5862
}
63+
64+
@Override
65+
public String getType() {
66+
return COMPRESSION_TYPE_DEFLATE;
67+
}
5968
}

‎modules/core/src/main/java/com/elasticinbox/core/blob/store/AESEncryptionHandler.java ‎modules/core/src/main/java/com/elasticinbox/core/blob/encryption/AESEncryptionHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2727
*/
2828

29-
package com.elasticinbox.core.blob.store;
29+
package com.elasticinbox.core.blob.encryption;
3030

3131
import java.io.InputStream;
3232
import java.security.InvalidAlgorithmParameterException;

‎modules/core/src/main/java/com/elasticinbox/core/blob/store/EncryptionHandler.java ‎modules/core/src/main/java/com/elasticinbox/core/blob/encryption/EncryptionHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2727
*/
2828

29-
package com.elasticinbox.core.blob.store;
29+
package com.elasticinbox.core.blob.encryption;
3030

3131
import java.io.InputStream;
3232
import java.security.GeneralSecurityException;

‎modules/core/src/main/java/com/elasticinbox/core/blob/naming/BlobNameBuilder.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,14 @@ public String build() {
7575
}
7676

7777
/**
78-
* Validate generated Blob name
78+
* Validate generated Blob name.
79+
* <p>
80+
* Suffix no longer used for compression identification. Keep for backward
81+
* compatibility with 0.3.
7982
*
8083
* @param name
8184
*/
85+
@Deprecated
8286
private final static void validateBlobName(String name) {
8387
Assert.isFalse(
8488
name.endsWith(BlobStoreConstants.COMPRESS_SUFFIX),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.elasticinbox.core.blob.store;
2+
3+
import java.io.IOException;
4+
import java.security.MessageDigest;
5+
6+
import com.elasticinbox.core.blob.compression.CompressionHandler;
7+
import com.elasticinbox.core.blob.encryption.EncryptionHandler;
8+
9+
public abstract class AbstractBlobStorage implements BlobStorage
10+
{
11+
protected final CompressionHandler compressionHandler;
12+
protected final EncryptionHandler encryptionHandler;
13+
14+
/**
15+
* Constructor
16+
*
17+
* @param ch Injected Compression Handler
18+
* @param eh Injected Encryption Handler
19+
*/
20+
public AbstractBlobStorage(CompressionHandler ch, EncryptionHandler eh) {
21+
this.compressionHandler = ch;
22+
this.encryptionHandler = eh;
23+
}
24+
25+
/**
26+
* Generate cipher initialisation vector (IV) from Blob name.
27+
*
28+
* IV should be unique but not necessarily secure. Since blob names are
29+
* based on Type1 UUID they are unique.
30+
*
31+
* @param blobName
32+
* @return
33+
* @throws IOException
34+
*/
35+
protected static byte[] getCipherIVFromBlobName(final String blobName) throws IOException
36+
{
37+
byte[] iv;
38+
39+
try {
40+
byte[] nameBytes = blobName.getBytes("UTF-8");
41+
MessageDigest md = MessageDigest.getInstance("MD5");
42+
iv = md.digest(nameBytes);
43+
} catch (Exception e) {
44+
// should never happen
45+
throw new IOException(e);
46+
}
47+
48+
return iv;
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright (c) 2011-2012 Optimax Software Ltd.
2+
* Copyright (c) 2011-2013 Optimax Software Ltd.
33
* All rights reserved.
44
*
55
* Redistribution and use in source and binary forms, with or without
@@ -32,40 +32,20 @@
3232
import java.io.InputStream;
3333
import java.net.URI;
3434
import java.security.GeneralSecurityException;
35-
import java.security.MessageDigest;
35+
import java.util.UUID;
3636

37-
import org.slf4j.Logger;
38-
import org.slf4j.LoggerFactory;
39-
40-
import com.elasticinbox.common.utils.Assert;
41-
import com.elasticinbox.config.Configurator;
4237
import com.elasticinbox.core.blob.BlobDataSource;
43-
import com.elasticinbox.core.blob.BlobUtils;
38+
import com.elasticinbox.core.model.Mailbox;
4439

45-
public class BlobStorage
40+
public interface BlobStorage
4641
{
47-
private static final Logger logger =
48-
LoggerFactory.getLogger(BlobStorage.class);
49-
50-
private final CompressionHandler compressionHandler;
51-
private final EncryptionHandler encryptionHandler;
52-
53-
/**
54-
* Constructor
55-
*
56-
* @param ch Injected Compression Handler
57-
* @param eh Injected Encryption Handler
58-
*/
59-
public BlobStorage(CompressionHandler ch, EncryptionHandler eh) {
60-
this.compressionHandler = ch;
61-
this.encryptionHandler = eh;
62-
}
63-
6442
/**
6543
* Store blob contents, optionally compress and encrypt.
6644
*
67-
* @param blobName
68-
* Blob filename including relative path
45+
* @param messageId
46+
* Unique message ID
47+
* @param mailbox
48+
* Message owner's Mailbox
6949
* @param profileName
7050
* Blob store profile name
7151
* @param in
@@ -76,99 +56,24 @@ public BlobStorage(CompressionHandler ch, EncryptionHandler eh) {
7656
* @throws IOException
7757
* @throws GeneralSecurityException
7858
*/
79-
public URI write(String blobName, final String profileName, final InputStream in, final Long size)
80-
throws IOException, GeneralSecurityException
81-
{
82-
Assert.notNull(in, "No data to store");
83-
84-
InputStream in1, in2;
85-
Long processedSize = size;
86-
87-
// compress stream
88-
if ((compressionHandler != null) && (size > BlobStoreConstants.MIN_COMPRESS_SIZE))
89-
{
90-
in1 = compressionHandler.compress(in);
91-
blobName = blobName + BlobStoreConstants.COMPRESS_SUFFIX;
92-
93-
// size changed, set to unknown to recalculate
94-
processedSize = null;
95-
} else {
96-
in1 = in;
97-
}
98-
99-
// encrypt stream
100-
if (encryptionHandler != null)
101-
{
102-
byte[] iv = getCipherIVFromBlobName(blobName);
103-
in2 = this.encryptionHandler.encrypt(in1, Configurator.getBlobStoreDefaultEncryptionKey(), iv);
104-
105-
// size changed, set to unknown to recalculate
106-
processedSize = null;
107-
} else {
108-
in2 = in1;
109-
}
110-
111-
URI uri = BlobStoreProxy.write(blobName, profileName, in2, processedSize);
112-
113-
return uri;
114-
}
115-
59+
public URI write(final UUID messageId, final Mailbox mailbox, final String profileName, final InputStream in, final Long size)
60+
throws IOException, GeneralSecurityException;
61+
11662
/**
117-
* Read Blob contents and decrypt
63+
* Read blob contents and decrypt
11864
*
11965
* @param uri Blob URI
120-
* @param keyAlias Cipher Key Alias
121-
* @param uncompress Specifies if blob should be uncompressed.
12266
* @return
12367
* @throws IOException
12468
*/
125-
public BlobDataSource read(final URI uri, final String keyAlias) throws IOException
126-
{
127-
InputStream in;
128-
129-
if (encryptionHandler != null && keyAlias != null)
130-
{
131-
try {
132-
logger.debug("Decrypting object {} with key {}", uri, keyAlias);
69+
public BlobDataSource read(final URI uri) throws IOException;
13370

134-
byte[] iv = getCipherIVFromBlobName(BlobUtils.relativize(uri.getPath()));
135-
136-
in = this.encryptionHandler.decrypt(BlobStoreProxy.read(uri),
137-
Configurator.getEncryptionKey(keyAlias), iv);
138-
} catch (GeneralSecurityException gse) {
139-
throw new IOException("Unable to decrypt message blob: ", gse);
140-
}
141-
} else {
142-
in = BlobStoreProxy.read(uri);
143-
}
144-
145-
return new BlobDataSource(uri, in, this.compressionHandler);
146-
}
147-
14871
/**
149-
* Generate cipher initialisation vector (IV) from Blob name.
72+
* Delete blob
15073
*
151-
* IV should be unique but not necessarily secure. Since blob names are
152-
* based on Type1 UUID they are unique.
153-
*
154-
* @param blobName
155-
* @return
156-
* @throws IOException
74+
* @param uri
75+
* @throws IOException
15776
*/
158-
private static byte[] getCipherIVFromBlobName(final String blobName) throws IOException
159-
{
160-
byte[] iv;
161-
162-
try {
163-
byte[] nameBytes = blobName.getBytes("UTF-8");
164-
MessageDigest md = MessageDigest.getInstance("MD5");
165-
iv = md.digest(nameBytes);
166-
} catch (Exception e) {
167-
// should never happen
168-
throw new IOException(e);
169-
}
170-
171-
return iv;
172-
}
77+
public void delete(final URI uri) throws IOException;
17378

17479
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/**
2+
* Copyright (c) 2011-2013 Optimax Software Ltd.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice,
9+
* this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above copyright notice,
11+
* this list of conditions and the following disclaimer in the documentation
12+
* and/or other materials provided with the distribution.
13+
* * Neither the name of Optimax Software, ElasticInbox, nor the names
14+
* of its contributors may be used to endorse or promote products derived
15+
* from this software without specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
21+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24+
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
package com.elasticinbox.core.blob.store;
30+
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.net.URI;
34+
import java.security.GeneralSecurityException;
35+
import java.util.UUID;
36+
37+
import com.elasticinbox.common.utils.Assert;
38+
import com.elasticinbox.config.Configurator;
39+
import com.elasticinbox.config.DatabaseConstants;
40+
import com.elasticinbox.core.blob.BlobDataSource;
41+
import com.elasticinbox.core.blob.BlobURI;
42+
import com.elasticinbox.core.blob.compression.CompressionHandler;
43+
import com.elasticinbox.core.blob.encryption.EncryptionHandler;
44+
import com.elasticinbox.core.model.Mailbox;
45+
46+
/**
47+
* Blob storage mediator is an abstraction layer which contains logic which
48+
* determines where to store or how to access given blob.
49+
*
50+
* @author Rustam Aliyev
51+
*/
52+
public final class BlobStorageMediator implements BlobStorage
53+
{
54+
private BlobStorage cloudBlobStorage;
55+
private BlobStorage dbBlobStorage;
56+
57+
public BlobStorageMediator(CompressionHandler ch, EncryptionHandler eh) {
58+
cloudBlobStorage = new CloudBlobStorage(ch, eh);
59+
dbBlobStorage = new CassandraBlobStorage(ch, eh);
60+
}
61+
62+
public URI write(UUID messageId, Mailbox mailbox, String profileName,
63+
InputStream in, Long size) throws IOException,
64+
GeneralSecurityException
65+
{
66+
Assert.notNull(in, "No data to store");
67+
68+
if (size <= Configurator.getDatabaseBlobMaxSize()) {
69+
return dbBlobStorage.write(messageId, mailbox, null, in, size);
70+
} else {
71+
return cloudBlobStorage.write(messageId, mailbox, Configurator.getBlobStoreWriteProfileName(), in, size);
72+
}
73+
}
74+
75+
public BlobDataSource read(URI uri) throws IOException
76+
{
77+
// check if blob was stored for the message
78+
Assert.notNull(uri, "URI cannot be null");
79+
80+
boolean isDbProfile = new BlobURI().fromURI(uri).getProfile()
81+
.equals(DatabaseConstants.DATABASE_PROFILE);
82+
83+
if (isDbProfile) {
84+
return dbBlobStorage.read(uri);
85+
} else {
86+
return cloudBlobStorage.read(uri);
87+
}
88+
}
89+
90+
public void delete(URI uri) throws IOException
91+
{
92+
// check if blob was stored for the message, silently skip otherwise
93+
if (uri == null) {
94+
return;
95+
}
96+
97+
boolean isDbProfile = new BlobURI().fromURI(uri).getProfile()
98+
.equals(DatabaseConstants.DATABASE_PROFILE);
99+
100+
if (isDbProfile) {
101+
dbBlobStorage.delete(uri);
102+
} else {
103+
cloudBlobStorage.delete(uri);
104+
}
105+
}
106+
}

‎modules/core/src/main/java/com/elasticinbox/core/blob/store/BlobStoreConstants.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,27 @@
4040
public final class BlobStoreConstants
4141
{
4242
/** URI schema used to identify blobs. */
43-
public static final String BLOB_URI_SCHEMA = "blob";
44-
43+
public static final String URI_SCHEME = "blob";
44+
45+
/** URI query parameter specifying compression */
46+
public static final String URI_PARAM_COMPRESSION = "c";
47+
48+
/** URI query parameter specifying encryption key */
49+
public static final String URI_PARAM_ENCRYPTION_KEY = "e";
50+
51+
/** URI query parameter specifying total block count */
52+
public static final String URI_PARAM_BLOCK_COUNT = "b";
53+
4554
/** This suffix used to differentiate compressed files from not compressed. */
55+
@Deprecated
4656
public static final String COMPRESS_SUFFIX = ".dfl";
47-
57+
4858
/** Files smaller that this parameter should not be compressed. In bytes. */
4959
public static final Integer MIN_COMPRESS_SIZE = 512;
5060

61+
/** Default block ID. Currently only single block DB operations supported. */
62+
public static final int DATABASE_DEFAULT_BLOCK_ID = 0;
63+
5164
/**
5265
* Providers that are independently configurable. Currently invisible form jClouds.
5366
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Copyright (c) 2011-2013 Optimax Software Ltd.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice,
9+
* this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above copyright notice,
11+
* this list of conditions and the following disclaimer in the documentation
12+
* and/or other materials provided with the distribution.
13+
* * Neither the name of Optimax Software, ElasticInbox, nor the names
14+
* of its contributors may be used to endorse or promote products derived
15+
* from this software without specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
21+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24+
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
package com.elasticinbox.core.blob.store;
30+
31+
import static com.elasticinbox.config.DatabaseConstants.*;
32+
import static com.elasticinbox.core.blob.store.BlobStoreConstants.*;
33+
34+
import java.io.IOException;
35+
import java.io.InputStream;
36+
import java.net.URI;
37+
import java.security.GeneralSecurityException;
38+
import java.util.UUID;
39+
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
import com.elasticinbox.common.utils.Assert;
44+
import com.elasticinbox.core.blob.BlobDataSource;
45+
import com.elasticinbox.core.blob.BlobURI;
46+
import com.elasticinbox.core.blob.compression.CompressionHandler;
47+
import com.elasticinbox.core.blob.encryption.EncryptionHandler;
48+
import com.elasticinbox.core.cassandra.persistence.BlobPersistence;
49+
import com.elasticinbox.core.model.Mailbox;
50+
import com.google.common.io.ByteStreams;
51+
52+
/**
53+
* Blob storage proxy for Cassandra
54+
*
55+
* @author Rustam Aliyev
56+
*/
57+
public final class CassandraBlobStorage extends AbstractBlobStorage
58+
{
59+
private static final Logger logger =
60+
LoggerFactory.getLogger(CassandraBlobStorage.class);
61+
62+
/**
63+
* Constructor
64+
*
65+
* @param ch Injected Compression Handler
66+
* @param eh Injected Encryption Handler
67+
*/
68+
public CassandraBlobStorage(CompressionHandler ch, EncryptionHandler eh) {
69+
super(ch, eh);
70+
}
71+
72+
@Override
73+
public URI write(final UUID messageId, final Mailbox mailbox, final String profileName, final InputStream in, final Long size)
74+
throws IOException, GeneralSecurityException
75+
{
76+
Assert.isTrue(size <= MAX_BLOB_SIZE, "Blob larger than " + MAX_BLOB_SIZE
77+
+ " bytes can't be stored in Cassandra. Provided blob size: " + size + " bytes");
78+
79+
logger.debug("Storing blob {} in Cassandra", messageId);
80+
81+
// prepare URI
82+
BlobURI blobUri = new BlobURI()
83+
.setProfile(DATABASE_PROFILE)
84+
.setName(messageId.toString()).setBlockCount(1);
85+
86+
InputStream in1;
87+
88+
// compress stream
89+
if ((compressionHandler != null) && (size > MIN_COMPRESS_SIZE))
90+
{
91+
in1 = compressionHandler.compress(in);
92+
blobUri.setCompression(compressionHandler.getType());
93+
} else {
94+
in1 = in;
95+
}
96+
97+
// store blob
98+
// TODO: currently we allow only single block writes (blockid=0). in future we can split blobs to multiple blocks
99+
BlobPersistence.writeBlock(messageId, DATABASE_DEFAULT_BLOCK_ID, ByteStreams.toByteArray(in1));
100+
101+
return blobUri.buildURI();
102+
}
103+
104+
@Override
105+
public BlobDataSource read(final URI uri) throws IOException
106+
{
107+
logger.debug("Reading blob {} from Cassandra", uri);
108+
109+
BlobURI blobUri = new BlobURI().fromURI(uri);
110+
Assert.isTrue(blobUri.getProfile().equals(DATABASE_PROFILE), "Blob store profile does not match database.");
111+
112+
UUID messageId = UUID.fromString(blobUri.getName());
113+
byte[] messageBlock = BlobPersistence.readBlock(messageId, DATABASE_DEFAULT_BLOCK_ID);
114+
InputStream in = ByteStreams.newInputStreamSupplier(messageBlock).getInput();
115+
116+
return new BlobDataSource(uri, in, this.compressionHandler);
117+
}
118+
119+
@Override
120+
public void delete(URI uri) throws IOException
121+
{
122+
logger.debug("Deleting blob {}", uri);
123+
124+
BlobURI blobUri = new BlobURI().fromURI(uri);
125+
Assert.isTrue(blobUri.getProfile().equals(DATABASE_PROFILE), "Blob store profile does not match database.");
126+
127+
UUID messageId = UUID.fromString(blobUri.getName());
128+
BlobPersistence.deleteBlock(messageId, BlobStoreConstants.DATABASE_DEFAULT_BLOCK_ID);
129+
}
130+
131+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/**
2+
* Copyright (c) 2011-2012 Optimax Software Ltd.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice,
9+
* this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above copyright notice,
11+
* this list of conditions and the following disclaimer in the documentation
12+
* and/or other materials provided with the distribution.
13+
* * Neither the name of Optimax Software, ElasticInbox, nor the names
14+
* of its contributors may be used to endorse or promote products derived
15+
* from this software without specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
21+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24+
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
package com.elasticinbox.core.blob.store;
30+
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.net.URI;
34+
import java.security.GeneralSecurityException;
35+
import java.util.UUID;
36+
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
import com.elasticinbox.config.Configurator;
41+
import com.elasticinbox.core.blob.BlobDataSource;
42+
import com.elasticinbox.core.blob.BlobURI;
43+
import com.elasticinbox.core.blob.BlobUtils;
44+
import com.elasticinbox.core.blob.compression.CompressionHandler;
45+
import com.elasticinbox.core.blob.encryption.EncryptionHandler;
46+
import com.elasticinbox.core.blob.naming.BlobNameBuilder;
47+
import com.elasticinbox.core.model.Mailbox;
48+
49+
public final class CloudBlobStorage extends AbstractBlobStorage
50+
{
51+
private static final Logger logger =
52+
LoggerFactory.getLogger(CloudBlobStorage.class);
53+
54+
/**
55+
* Constructor
56+
*
57+
* @param ch Injected Compression Handler
58+
* @param eh Injected Encryption Handler
59+
*/
60+
public CloudBlobStorage(CompressionHandler ch, EncryptionHandler eh) {
61+
super(ch, eh);
62+
}
63+
64+
@Override
65+
public URI write(final UUID messageId, final Mailbox mailbox, final String profileName, final InputStream in, final Long size)
66+
throws IOException, GeneralSecurityException
67+
{
68+
// get blob name
69+
String blobName = new BlobNameBuilder().setMailbox(mailbox)
70+
.setMessageId(messageId).setMessageSize(size).build();
71+
72+
InputStream in1, in2;
73+
Long processedSize = size;
74+
75+
// prepare URI
76+
BlobURI blobUri = new BlobURI()
77+
.setProfile(profileName)
78+
.setName(blobName);
79+
80+
// compress stream
81+
if ((compressionHandler != null) && (size > BlobStoreConstants.MIN_COMPRESS_SIZE))
82+
{
83+
in1 = compressionHandler.compress(in);
84+
blobUri.setCompression(compressionHandler.getType());
85+
86+
// size changed, set to unknown to recalculate
87+
processedSize = null;
88+
} else {
89+
in1 = in;
90+
}
91+
92+
// encrypt stream
93+
if (encryptionHandler != null)
94+
{
95+
byte[] iv = getCipherIVFromBlobName(blobName);
96+
in2 = this.encryptionHandler.encrypt(in1, Configurator.getBlobStoreDefaultEncryptionKey(), iv);
97+
98+
blobUri.setEncryptionKey(Configurator.getBlobStoreDefaultEncryptionKeyAlias());
99+
100+
// size changed, set to unknown to recalculate
101+
processedSize = null;
102+
} else {
103+
in2 = in1;
104+
}
105+
106+
CloudStoreProxy.write(blobName, profileName, in2, processedSize);
107+
108+
return blobUri.buildURI();
109+
}
110+
111+
@Override
112+
public BlobDataSource read(final URI uri) throws IOException
113+
{
114+
InputStream in;
115+
116+
BlobURI blobUri = new BlobURI().fromURI(uri);
117+
String keyAlias = blobUri.getEncryptionKey();
118+
119+
if (encryptionHandler != null && keyAlias != null)
120+
{
121+
try {
122+
logger.debug("Decrypting object {} with key {}", uri, keyAlias);
123+
124+
byte[] iv = getCipherIVFromBlobName(BlobUtils.relativize(uri.getPath()));
125+
126+
in = this.encryptionHandler.decrypt(CloudStoreProxy.read(uri),
127+
Configurator.getEncryptionKey(keyAlias), iv);
128+
} catch (GeneralSecurityException gse) {
129+
throw new IOException("Unable to decrypt message blob: ", gse);
130+
}
131+
} else {
132+
in = CloudStoreProxy.read(uri);
133+
}
134+
135+
return new BlobDataSource(uri, in, this.compressionHandler);
136+
}
137+
138+
@Override
139+
public void delete(final URI uri) throws IOException
140+
{
141+
CloudStoreProxy.delete(uri);
142+
}
143+
}

‎modules/core/src/main/java/com/elasticinbox/core/blob/store/BlobStoreProxy.java ‎modules/core/src/main/java/com/elasticinbox/core/blob/store/CloudStoreProxy.java

+6-28
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.io.IOException;
3333
import java.io.InputStream;
3434
import java.net.URI;
35-
import java.net.URISyntaxException;
3635
import java.security.GeneralSecurityException;
3736
import java.util.Properties;
3837
import java.util.concurrent.ConcurrentHashMap;
@@ -68,10 +67,10 @@
6867
* @see {@link BlobStoreContext}
6968
* @see <a href="http://www.jclouds.org/">jClouds</a>
7069
*/
71-
public final class BlobStoreProxy
70+
public final class CloudStoreProxy
7271
{
7372
private static final Logger logger =
74-
LoggerFactory.getLogger(BlobStoreProxy.class);
73+
LoggerFactory.getLogger(CloudStoreProxy.class);
7574

7675
private static final String PROVIDER_FILESYSTEM = "filesystem";
7776
private static final String PROVIDER_TRANSIENT = "transient";
@@ -94,7 +93,7 @@ public final class BlobStoreProxy
9493
* @return
9594
* @throws IOException
9695
*/
97-
public static URI write(final String blobName, final String profileName, InputStream in, final Long size)
96+
public static void write(final String blobName, final String profileName, InputStream in, final Long size)
9897
throws IOException, GeneralSecurityException
9998
{
10099
Assert.notNull(in, "No data to store");
@@ -124,8 +123,6 @@ public static URI write(final String blobName, final String profileName, InputSt
124123

125124
// store blob
126125
blobStore.putBlob(container, blobBuilder.build());
127-
128-
return buildURI(profileName, blobName);
129126
}
130127

131128
/**
@@ -139,7 +136,7 @@ public static InputStream read(URI uri)
139136
// check if blob was stored for the message
140137
Assert.notNull(uri, "URI cannot be null");
141138

142-
logger.debug("Reading object {}", uri);
139+
logger.debug("Reading blob {}", uri);
143140

144141
String profileName = uri.getHost();
145142
String container = Configurator.getBlobStoreProfile(profileName).getContainer();
@@ -165,7 +162,7 @@ public static void delete(URI uri)
165162
return;
166163
}
167164

168-
logger.debug("Deleting object {}", uri);
165+
logger.debug("Deleting blob {}", uri);
169166

170167
String profileName = uri.getHost();
171168
BlobStoreProfile profile = Configurator.getBlobStoreProfile(profileName);
@@ -216,7 +213,7 @@ private static BlobStoreContext getBlobStoreContext(String profileName)
216213
if(blobStoreContexts.containsKey(profileName)) {
217214
return blobStoreContexts.get(profileName);
218215
} else {
219-
synchronized (BlobStoreProxy.class)
216+
synchronized (CloudStoreProxy.class)
220217
{
221218
logger.debug("Creating new connection for '{}' blob store.", profileName);
222219

@@ -262,25 +259,6 @@ private static BlobStoreContext getBlobStoreContext(String profileName)
262259
}
263260
}
264261

265-
/**
266-
* Build {@link URI} from blobstore profile and blob path
267-
*
268-
* @param profile blobstore profile name
269-
* @param path blob path
270-
* @return
271-
*/
272-
public static URI buildURI(final String profile, final String path)
273-
{
274-
// URI requires absolute path, add leading "/" if not already set
275-
String absolutePath = (path.charAt(0) == '/') ? path : "/" + path;
276-
277-
try {
278-
return new URI(BlobStoreConstants.BLOB_URI_SCHEMA, profile, absolutePath, null);
279-
} catch (URISyntaxException e) {
280-
throw new IllegalArgumentException("Invalid blob profile or path: ", e);
281-
}
282-
}
283-
284262
/**
285263
* Close all blob store connections
286264
*/

‎modules/core/src/main/java/com/elasticinbox/core/cassandra/CassandraAccountDAO.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@
4646
import com.elasticinbox.config.Configurator;
4747
import com.elasticinbox.core.AccountDAO;
4848
import com.elasticinbox.core.MessageDAO;
49-
import com.elasticinbox.core.blob.store.BlobStoreProxy;
49+
import com.elasticinbox.core.blob.store.BlobStorage;
50+
import com.elasticinbox.core.blob.store.BlobStorageMediator;
5051
import com.elasticinbox.core.cassandra.persistence.AccountPersistence;
5152
import com.elasticinbox.core.cassandra.persistence.LabelCounterPersistence;
5253
import com.elasticinbox.core.cassandra.persistence.LabelIndexPersistence;
@@ -61,12 +62,15 @@ public final class CassandraAccountDAO implements AccountDAO
6162
private final Keyspace keyspace;
6263
private final static StringSerializer strSe = StringSerializer.get();
6364

65+
private final BlobStorage blobStorage;
66+
6467
@SuppressWarnings("unused")
6568
private final static Logger logger = LoggerFactory
6669
.getLogger(CassandraAccountDAO.class);
6770

6871
public CassandraAccountDAO(Keyspace keyspace) {
6972
this.keyspace = keyspace;
73+
this.blobStorage = new BlobStorageMediator(null, null);
7074
}
7175

7276
@Override
@@ -88,7 +92,7 @@ public void add(final Mailbox mailbox) throws IOException, IllegalArgumentExcept
8892
public void delete(final Mailbox mailbox) throws IOException
8993
{
9094
// purge all previously deleted objects
91-
// TODO: we should not instantinate here
95+
// TODO: we should not instantiate here
9296
MessageDAO messageDAO = new CassandraMessageDAO(keyspace);
9397
messageDAO.purge(mailbox, new Date());
9498

@@ -110,7 +114,7 @@ public void delete(final Mailbox mailbox) throws IOException
110114

111115
// delete message sources from object store
112116
for(UUID messageId : messages.keySet()) {
113-
BlobStoreProxy.delete(messages.get(messageId).getLocation());
117+
blobStorage.delete(messages.get(messageId).getLocation());
114118
}
115119

116120
// set start element for the next loop

‎modules/core/src/main/java/com/elasticinbox/core/cassandra/CassandraDAOFactory.java

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public final class CassandraDAOFactory extends DAOFactory
5656

5757
public final static String CF_ACCOUNTS = "Accounts";
5858
public final static String CF_METADATA = "MessageMetadata";
59+
public final static String CF_BLOB = "MessageBlob";
5960
public final static String CF_LABEL_INDEX = "IndexLabels";
6061
public final static String CF_COUNTERS = "Counters";
6162

‎modules/core/src/main/java/com/elasticinbox/core/cassandra/CassandraMessageDAO.java

+14-19
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,12 @@
5454
import com.elasticinbox.core.MessageDAO;
5555
import com.elasticinbox.core.OverQuotaException;
5656
import com.elasticinbox.core.blob.BlobDataSource;
57-
import com.elasticinbox.core.blob.naming.BlobNameBuilder;
58-
import com.elasticinbox.core.blob.store.AESEncryptionHandler;
57+
import com.elasticinbox.core.blob.compression.CompressionHandler;
58+
import com.elasticinbox.core.blob.compression.DeflateCompressionHandler;
59+
import com.elasticinbox.core.blob.encryption.AESEncryptionHandler;
60+
import com.elasticinbox.core.blob.encryption.EncryptionHandler;
5961
import com.elasticinbox.core.blob.store.BlobStorage;
60-
import com.elasticinbox.core.blob.store.BlobStoreProxy;
61-
import com.elasticinbox.core.blob.store.CompressionHandler;
62-
import com.elasticinbox.core.blob.store.DeflateCompressionHandler;
63-
import com.elasticinbox.core.blob.store.EncryptionHandler;
62+
import com.elasticinbox.core.blob.store.BlobStorageMediator;
6463
import com.elasticinbox.core.cassandra.persistence.*;
6564
import com.elasticinbox.core.cassandra.utils.BatchConstants;
6665
import com.elasticinbox.core.cassandra.utils.ThrottlingMutator;
@@ -92,7 +91,7 @@ public CassandraMessageDAO(Keyspace keyspace)
9291
EncryptionHandler encryptionHandler =
9392
Configurator.isBlobStoreEncryptionEnabled() ? new AESEncryptionHandler() : null;
9493

95-
this.blobStorage = new BlobStorage(compressionHandler, encryptionHandler);
94+
this.blobStorage = new BlobStorageMediator(compressionHandler, encryptionHandler);
9695
}
9796

9897
@Override
@@ -106,7 +105,7 @@ public BlobDataSource getRaw(final Mailbox mailbox, final UUID messageId)
106105
throws IOException
107106
{
108107
Message metadata = MessagePersistence.fetch(mailbox.getId(), messageId, false);
109-
return blobStorage.read(metadata.getLocation(), metadata.getEncryptionKeyAlias());
108+
return blobStorage.read(metadata.getLocation());
110109
}
111110

112111
@Override
@@ -157,14 +156,10 @@ public void put(final Mailbox mailbox, UUID messageId, Message message, InputStr
157156
if (in != null)
158157
{
159158
try {
160-
// get blob name
161-
String blobName = new BlobNameBuilder()
162-
.setMailbox(mailbox).setMessageId(messageId)
163-
.setMessageSize(message.getSize()).build();
164-
165-
// store message in blobstore
166-
uri = blobStorage.write(blobName, Configurator.getBlobStoreWriteProfileName(), in, message.getSize());
167-
159+
uri = blobStorage.write(messageId, mailbox,
160+
Configurator.getBlobStoreWriteProfileName(), in,
161+
message.getSize());
162+
168163
// update location in metadata
169164
message.setLocation(uri);
170165
} catch (Exception e) {
@@ -198,7 +193,7 @@ public void put(final Mailbox mailbox, UUID messageId, Message message, InputStr
198193

199194
// rollback
200195
if (uri != null) {
201-
BlobStoreProxy.delete(uri);
196+
blobStorage.delete(uri);
202197
}
203198

204199
throw new IOException("Unable to store message metadata: ", e);
@@ -412,7 +407,7 @@ public void delete(final Mailbox mailbox, final List<UUID> messageIds)
412407
}
413408

414409
@Override
415-
public void purge(final Mailbox mailbox, final Date age)
410+
public void purge(final Mailbox mailbox, final Date age) throws IOException
416411
{
417412
Map<UUID, UUID> purgeIndex = null;
418413

@@ -433,7 +428,7 @@ public void purge(final Mailbox mailbox, final Date age)
433428

434429
// delete message sources from object store
435430
for(UUID messageId : messages.keySet()) {
436-
BlobStoreProxy.delete(messages.get(messageId).getLocation());
431+
blobStorage.delete(messages.get(messageId).getLocation());
437432
}
438433

439434
// purge expired (older than age) messages
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/**
2+
* Copyright (c) 2011-2013 Optimax Software Ltd.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice,
9+
* this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above copyright notice,
11+
* this list of conditions and the following disclaimer in the documentation
12+
* and/or other materials provided with the distribution.
13+
* * Neither the name of Optimax Software, ElasticInbox, nor the names
14+
* of its contributors may be used to endorse or promote products derived
15+
* from this software without specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
21+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24+
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
package com.elasticinbox.core.cassandra.persistence;
30+
31+
import java.util.UUID;
32+
33+
import com.elasticinbox.core.cassandra.CassandraDAOFactory;
34+
35+
import static com.elasticinbox.config.DatabaseConstants.BLOB_BLOCK_SIZE;
36+
import static com.elasticinbox.core.cassandra.CassandraDAOFactory.CF_BLOB;
37+
import static me.prettyprint.hector.api.factory.HFactory.createColumn;
38+
39+
import me.prettyprint.cassandra.serializers.BytesArraySerializer;
40+
import me.prettyprint.cassandra.serializers.CompositeSerializer;
41+
import me.prettyprint.cassandra.serializers.IntegerSerializer;
42+
import me.prettyprint.cassandra.serializers.UUIDSerializer;
43+
import me.prettyprint.hector.api.beans.Composite;
44+
import me.prettyprint.hector.api.beans.HColumn;
45+
import me.prettyprint.hector.api.factory.HFactory;
46+
import me.prettyprint.hector.api.mutation.Mutator;
47+
import me.prettyprint.hector.api.query.ColumnQuery;
48+
import me.prettyprint.hector.api.query.QueryResult;
49+
50+
/**
51+
* Blob block operations. Currently supports only single block blobs.
52+
* <p>
53+
* Do not batch read/write requests. Delete requests can be batched up when
54+
* multi-block operations introduced).
55+
*
56+
* @author Rustam Aliyev
57+
*/
58+
public class BlobPersistence
59+
{
60+
private final static IntegerSerializer intSe = IntegerSerializer.get();
61+
private final static BytesArraySerializer byteSe = BytesArraySerializer.get();
62+
private final static UUIDSerializer uuidSe = UUIDSerializer.get();
63+
64+
/** Default sub-block ID. Currently only single sub-block operations supported. */
65+
public static int DEFAULT_SUB_BLOCK_ID = 0;
66+
67+
/**
68+
* Write blob block into Cassandra.
69+
*
70+
* @param objectId Blob ID
71+
* @param blockId Block ID (starting from 0)
72+
* @param data
73+
*/
74+
public static void writeBlock(final UUID objectId, final int blockId, byte[] data)
75+
{
76+
if (data.length > BLOB_BLOCK_SIZE) {
77+
throw new IllegalArgumentException("Data (" + data.length
78+
+ " bytes) is larger than the maximum block size ("
79+
+ BLOB_BLOCK_SIZE + " bytes)");
80+
}
81+
82+
Mutator<Composite> mutator = HFactory.createMutator(
83+
CassandraDAOFactory.getKeyspace(), CompositeSerializer.get());
84+
85+
Composite key = new Composite();
86+
key.addComponent(objectId, uuidSe);
87+
key.addComponent(blockId, intSe);
88+
89+
mutator.insert(key, CF_BLOB,
90+
createColumn(DEFAULT_SUB_BLOCK_ID, data, intSe, byteSe));
91+
}
92+
93+
/**
94+
* Read blob block from Cassandra.
95+
*
96+
* @param objectId Blob ID
97+
* @param blockId Block ID
98+
* @return
99+
*/
100+
public static byte[] readBlock(final UUID objectId, final int blockId)
101+
{
102+
Composite key = new Composite();
103+
key.addComponent(objectId, uuidSe);
104+
key.addComponent(blockId, intSe);
105+
106+
ColumnQuery<Composite, Integer, byte[]> q = HFactory.createColumnQuery(
107+
CassandraDAOFactory.getKeyspace(), CompositeSerializer.get(), intSe, byteSe);
108+
109+
QueryResult<HColumn<Integer, byte[]>> result = q.setColumnFamily(CF_BLOB)
110+
.setKey(key).setName(DEFAULT_SUB_BLOCK_ID).execute();
111+
112+
return result.get().getValue();
113+
}
114+
115+
/**
116+
* Delete blob block from Cassandra.
117+
*
118+
* @param objectId Blob ID
119+
* @param blockId Block ID
120+
*/
121+
public static void deleteBlock(final UUID objectId, final int blockId)
122+
{
123+
Composite key = new Composite();
124+
key.addComponent(objectId, uuidSe);
125+
key.addComponent(blockId, intSe);
126+
127+
Mutator<Composite> mutator = HFactory.createMutator(
128+
CassandraDAOFactory.getKeyspace(), CompositeSerializer.get());
129+
130+
mutator.delete(key, CF_BLOB, null, intSe);
131+
}
132+
}

‎modules/core/src/main/java/com/elasticinbox/core/cassandra/persistence/Marshaller.java

-8
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ public final class Marshaller
6666
public final static String CN_PLAIN_BODY = "plain";
6767
public final static String CN_PARTS = "parts";
6868
public final static String CN_BRI = "bri"; // Blob Resource Identifier
69-
public final static String CN_ENCRYPTION_KEY = "eka"; // Encryption Key Alias
7069
public final static String CN_LABEL_PREFIX = "l:";
7170
public final static String CN_MARKER_PREFIX = "m:";
7271

@@ -111,8 +110,6 @@ protected static Message unmarshall(
111110
message.setCc(unserializeAddress(c.getValue()));
112111
} else if (c.getName().equals(CN_BCC)) {
113112
message.setBcc(unserializeAddress(c.getValue()));
114-
} else if (c.getName().equals(CN_ENCRYPTION_KEY)) {
115-
message.setEncryptionKeyAlias(strSe.fromBytes(c.getValue()));
116113
} else if (c.getName().equals(CN_BRI)) {
117114
message.setLocation(URI.create(
118115
strSe.fromBytes(c.getValue())));
@@ -232,11 +229,6 @@ protected static List<HColumn<String, byte[]>> marshall(final Message m)
232229
columns.put(CN_PLAIN_BODY, IOUtils.compress(m.getPlainBody()));
233230
}
234231

235-
// add encryption key alias
236-
if (Configurator.isBlobStoreEncryptionEnabled()) {
237-
columns.put(CN_ENCRYPTION_KEY, Configurator.getBlobStoreDefaultEncryptionKeyAlias());
238-
}
239-
240232
return mapToHColumns(columns);
241233
}
242234

‎modules/core/src/main/java/com/elasticinbox/core/model/Message.java

+4-22
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@
3838

3939
import com.fasterxml.jackson.annotation.JsonIgnore;
4040

41-
import com.elasticinbox.core.blob.store.BlobStoreConstants;
42-
import com.elasticinbox.core.blob.store.BlobStoreProxy;
41+
import com.elasticinbox.core.blob.BlobURI;
4342

4443
/**
4544
* Representation of MIME message containing message headers, labels, markers,
@@ -160,17 +159,10 @@ public URI getLocation() {
160159

161160
public void setLocation(URI uri)
162161
{
163-
if (uri.getScheme().equals(BlobStoreConstants.BLOB_URI_SCHEMA)) {
164-
this.location = uri;
165-
} else {
166-
throw new IllegalArgumentException(
167-
"Invalid URI scheme specified for blob: " + uri.getScheme());
168-
}
169-
}
162+
// validate URI, should throw exception if invalid
163+
new BlobURI().fromURI(uri);
170164

171-
public void setLocation(String profile, String path)
172-
{
173-
this.location = BlobStoreProxy.buildURI(profile, path);
165+
this.location = uri;
174166
}
175167

176168
public String getPlainBody() {
@@ -270,16 +262,6 @@ public void addMarker(Marker marker) {
270262
this.markers.add(marker);
271263
}
272264

273-
@JsonIgnore
274-
public void setEncryptionKeyAlias(final String key) {
275-
this.encryptionKey = key;
276-
}
277-
278-
@JsonIgnore
279-
public String getEncryptionKeyAlias() {
280-
return this.encryptionKey;
281-
}
282-
283265
@JsonIgnore
284266
public boolean isEncrypted() {
285267
return this.encryptionKey != null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/**
2+
* Copyright (c) 2011-2013 Optimax Software Ltd.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice,
9+
* this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above copyright notice,
11+
* this list of conditions and the following disclaimer in the documentation
12+
* and/or other materials provided with the distribution.
13+
* * Neither the name of Optimax Software, ElasticInbox, nor the names
14+
* of its contributors may be used to endorse or promote products derived
15+
* from this software without specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
21+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24+
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
package com.elasticinbox.core.blob;
30+
31+
import static org.junit.Assert.*;
32+
33+
import java.net.URI;
34+
35+
import org.junit.Test;
36+
37+
import com.elasticinbox.core.blob.BlobURI;
38+
39+
public class BlobURIUnitTest
40+
{
41+
@Test
42+
public void testFromURI1()
43+
{
44+
URI testUri = URI.create("blob://aws3-bucket/f1ca99e0-99a0-11e2-95f0-040cced3bd7a:myemail%40elasticinbox.com?c=dfl&e=ekey2");
45+
46+
BlobURI bu = new BlobURI();
47+
bu.fromURI(testUri);
48+
49+
assertEquals("aws3-bucket", bu.getProfile());
50+
assertEquals("f1ca99e0-99a0-11e2-95f0-040cced3bd7a:myemail@elasticinbox.com", bu.getName());
51+
assertEquals("dfl", bu.getCompression());
52+
assertEquals("ekey2", bu.getEncryptionKey());
53+
assertNull(bu.getBlockCount());
54+
}
55+
56+
@Test
57+
public void testFromURI2()
58+
{
59+
URI testUri = URI.create("blob://db/f1ca99e0-99a0-11e2-95f0-040cced3bd7a?c=gz&b=1");
60+
61+
BlobURI bu = new BlobURI();
62+
bu.fromURI(testUri);
63+
64+
assertEquals("db", bu.getProfile());
65+
assertEquals("f1ca99e0-99a0-11e2-95f0-040cced3bd7a", bu.getName());
66+
assertEquals("gz", bu.getCompression());
67+
assertEquals(new Integer(1), bu.getBlockCount());
68+
assertNull(bu.getEncryptionKey());
69+
}
70+
71+
@Test(expected=IllegalArgumentException.class)
72+
public void testFromBadSchemeURI()
73+
{
74+
URI testUri = URI.create("notblob://db/f1ca99e0-99a0-11e2-95f0-040cced3bd7a");
75+
new BlobURI().fromURI(testUri);
76+
}
77+
78+
@Test
79+
public void testBuildURI()
80+
{
81+
URI testUri = URI.create("blob://my-azure-bs/f1ca99e0-99a0-11e2-95f0-040cced3bd7a?c=dfl&e=k2&b=100");
82+
83+
BlobURI bu = new BlobURI()
84+
.setProfile("my-azure-bs")
85+
.setName("f1ca99e0-99a0-11e2-95f0-040cced3bd7a")
86+
.setBlockCount(100)
87+
.setCompression("dfl")
88+
.setEncryptionKey("k2");
89+
90+
assertEquals(testUri, bu.buildURI());
91+
}
92+
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/**
2+
* Copyright (c) 2011-2013 Optimax Software Ltd.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice,
9+
* this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above copyright notice,
11+
* this list of conditions and the following disclaimer in the documentation
12+
* and/or other materials provided with the distribution.
13+
* * Neither the name of Optimax Software, ElasticInbox, nor the names
14+
* of its contributors may be used to endorse or promote products derived
15+
* from this software without specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
21+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24+
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
package com.elasticinbox.core.blob.store;
30+
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.lessThan;
33+
import static org.junit.Assert.assertThat;
34+
35+
import java.io.File;
36+
import java.io.FileInputStream;
37+
import java.io.IOException;
38+
import java.io.InputStream;
39+
import java.net.URI;
40+
import java.security.GeneralSecurityException;
41+
import java.util.UUID;
42+
43+
import org.junit.After;
44+
import org.junit.Before;
45+
import org.junit.Test;
46+
47+
import com.elasticinbox.common.utils.IOUtils;
48+
import com.elasticinbox.config.Configurator;
49+
import com.elasticinbox.config.DatabaseConstants;
50+
import com.elasticinbox.core.blob.BlobDataSource;
51+
import com.elasticinbox.core.blob.compression.DeflateCompressionHandler;
52+
import com.elasticinbox.core.blob.encryption.AESEncryptionHandler;
53+
import com.elasticinbox.core.model.Mailbox;
54+
55+
public class CassandraStorageTest
56+
{
57+
private final static String TEST_FILE = "../../itests/src/test/resources/01-simple-ascii.eml";
58+
private final static String TEST_LARGE_FILE = "../../itests/src/test/resources/01-attach-utf8.eml";
59+
private final static UUID MESSAGE_ID = UUID.fromString("f1ca99e0-99a0-11e2-95f0-040cced3bd7a");
60+
private final static Mailbox MAILBOX = new Mailbox("test@elasticinbox.com");
61+
private URI blobUri;
62+
63+
@Before
64+
public void setupCase()
65+
{
66+
//System.setProperty("elasticinbox.config", "../../config/elasticinbox.yaml");
67+
System.setProperty("elasticinbox.config", "../../itests/src/test/resources/elasticinbox.yaml");
68+
}
69+
70+
@After
71+
public void teardownCase() {
72+
}
73+
74+
@Test
75+
public void testBlobStorage() throws IOException, GeneralSecurityException
76+
{
77+
String expextedBlobUrl = "blob://"
78+
+ DatabaseConstants.DATABASE_PROFILE + "/"
79+
+ MESSAGE_ID + "?"
80+
+ BlobStoreConstants.URI_PARAM_BLOCK_COUNT + "=1";
81+
82+
// BlobStorage without encryption or compression
83+
AbstractBlobStorage bs = new CassandraBlobStorage(null, null);
84+
85+
// Write blob
86+
long origSize = testWrite(bs, TEST_FILE);
87+
88+
// Check written Blob URI
89+
assertThat(blobUri.toString(), equalTo(expextedBlobUrl));
90+
91+
// Read blob back
92+
BlobDataSource ds = bs.read(blobUri);
93+
long newSize = IOUtils.getInputStreamSize(ds.getUncompressedInputStream());
94+
95+
// Check written Blob size
96+
assertThat(newSize, equalTo(origSize));
97+
98+
// Delete
99+
bs.delete(blobUri);
100+
}
101+
102+
@Test(expected=IllegalArgumentException.class)
103+
public void testLargeBlobStorage() throws IOException, GeneralSecurityException
104+
{
105+
// BlobStorage without encryption or compression
106+
AbstractBlobStorage bs = new CassandraBlobStorage(null, null);
107+
108+
// Write blob which is too large for DB storage. Should throw exception.
109+
testWrite(bs, TEST_LARGE_FILE);
110+
}
111+
112+
@Test
113+
public void testBlobStorageWithEcnryptionAndCompression() throws IOException, GeneralSecurityException
114+
{
115+
String expextedBlobUrl = "blob://"
116+
+ DatabaseConstants.DATABASE_PROFILE + "/"
117+
+ MESSAGE_ID + "?"
118+
+ BlobStoreConstants.URI_PARAM_COMPRESSION + "="
119+
+ DeflateCompressionHandler.COMPRESSION_TYPE_DEFLATE + "&"
120+
+ BlobStoreConstants.URI_PARAM_BLOCK_COUNT + "=1";
121+
122+
// BlobStorage with encryption or compression
123+
AbstractBlobStorage bs = new CassandraBlobStorage(new DeflateCompressionHandler(), new AESEncryptionHandler());
124+
125+
// Write blob
126+
long origSize = testWrite(bs, TEST_FILE);
127+
128+
// Check Blob URI
129+
assertThat(blobUri.toString(), equalTo(expextedBlobUrl));
130+
131+
// Read blob back
132+
BlobDataSource ds = bs.read(blobUri);
133+
134+
// Verify that suffix matches
135+
assertThat(ds.isCompressed(), equalTo(true));
136+
137+
// Verify that compressed size is smaller
138+
long compressedSize = IOUtils.getInputStreamSize(ds.getInputStream());
139+
assertThat(compressedSize, lessThan(origSize));
140+
141+
// Read blob back again (can't reuse same InputStream)
142+
ds = bs.read(blobUri);
143+
long newSize = IOUtils.getInputStreamSize(ds.getUncompressedInputStream());
144+
145+
// Check Blob size
146+
assertThat(newSize, equalTo(origSize));
147+
148+
// Delete
149+
bs.delete(blobUri);
150+
}
151+
152+
private long testWrite(AbstractBlobStorage bs, String filename) throws IOException, GeneralSecurityException
153+
{
154+
File file = new File(filename);
155+
InputStream in = new FileInputStream(file);
156+
157+
blobUri = bs.write(MESSAGE_ID, MAILBOX, Configurator.getBlobStoreWriteProfileName(), in, file.length());
158+
in.close();
159+
160+
return file.length();
161+
}
162+
163+
}

‎modules/core/src/test/java/com/elasticinbox/core/blob/store/BlobStorageTest.java ‎modules/core/src/test/java/com/elasticinbox/core/blob/store/CloudStorageTest.java

+31-17
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import java.io.IOException;
3838
import java.io.InputStream;
3939
import java.net.URI;
40+
import java.net.URLEncoder;
4041
import java.security.GeneralSecurityException;
42+
import java.util.UUID;
4143

4244
import org.junit.After;
4345
import org.junit.Before;
@@ -46,11 +48,15 @@
4648
import com.elasticinbox.common.utils.IOUtils;
4749
import com.elasticinbox.config.Configurator;
4850
import com.elasticinbox.core.blob.BlobDataSource;
51+
import com.elasticinbox.core.blob.compression.DeflateCompressionHandler;
52+
import com.elasticinbox.core.blob.encryption.AESEncryptionHandler;
53+
import com.elasticinbox.core.model.Mailbox;
4954

50-
public class BlobStorageTest
55+
public class CloudStorageTest
5156
{
5257
private final static String TEST_FILE = "../../itests/src/test/resources/01-attach-utf8.eml";
53-
private final static String TEMP_BLOB = "tmp-email-id-0001";
58+
private final static UUID MESSAGE_ID = UUID.fromString("f1ca99e0-99a0-11e2-95f0-040cced3bd7a");
59+
private final static Mailbox MAILBOX = new Mailbox("test@elasticinbox.com");
5460
private URI blobUri;
5561

5662
@Before
@@ -67,15 +73,19 @@ public void teardownCase() {
6773
@Test
6874
public void testBlobStorage() throws IOException, GeneralSecurityException
6975
{
76+
String expextedBlobUrl = "blob://"
77+
+ Configurator.getBlobStoreWriteProfileName() + "/"
78+
+ MESSAGE_ID + ":"
79+
+ URLEncoder.encode(MAILBOX.getId(), "UTF-8");
80+
7081
// BlobStorage without encryption or compression
71-
BlobStorage bs = new BlobStorage(null, null);
72-
String expextedBlobUrl = "blob://"+ Configurator.getBlobStoreWriteProfileName() + "/" + TEMP_BLOB;
82+
AbstractBlobStorage bs = new CloudBlobStorage(null, null);
7383

7484
// Write blob
7585
long origSize = testWrite(bs);
7686

7787
// Read blob back
78-
BlobDataSource ds = bs.read(blobUri, null);
88+
BlobDataSource ds = bs.read(blobUri);
7989
long newSize = IOUtils.getInputStreamSize(ds.getUncompressedInputStream());
8090

8191
// Check written Blob URI
@@ -85,15 +95,23 @@ public void testBlobStorage() throws IOException, GeneralSecurityException
8595
assertThat(newSize, equalTo(origSize));
8696

8797
// Delete
88-
testDelete();
98+
bs.delete(blobUri);
8999
}
90100

91101
@Test
92102
public void testBlobStorageWithEcnryptionAndCompression() throws IOException, GeneralSecurityException
93103
{
104+
String expextedBlobUrl = "blob://"
105+
+ Configurator.getBlobStoreWriteProfileName() + "/"
106+
+ MESSAGE_ID + ":"
107+
+ URLEncoder.encode(MAILBOX.getId(), "UTF-8") + "?"
108+
+ BlobStoreConstants.URI_PARAM_COMPRESSION + "="
109+
+ DeflateCompressionHandler.COMPRESSION_TYPE_DEFLATE + "&"
110+
+ BlobStoreConstants.URI_PARAM_ENCRYPTION_KEY + "="
111+
+ Configurator.getBlobStoreDefaultEncryptionKeyAlias();
112+
94113
// BlobStorage with encryption or compression
95-
BlobStorage bs = new BlobStorage(new DeflateCompressionHandler(), new AESEncryptionHandler());
96-
String expextedBlobUrl = "blob://"+ Configurator.getBlobStoreWriteProfileName() + "/" + TEMP_BLOB + BlobStoreConstants.COMPRESS_SUFFIX;
114+
AbstractBlobStorage bs = new CloudBlobStorage(new DeflateCompressionHandler(), new AESEncryptionHandler());
97115

98116
// Write blob
99117
long origSize = testWrite(bs);
@@ -102,7 +120,7 @@ public void testBlobStorageWithEcnryptionAndCompression() throws IOException, Ge
102120
assertThat(blobUri.toString(), equalTo(expextedBlobUrl));
103121

104122
// Read blob back
105-
BlobDataSource ds = bs.read(blobUri, Configurator.getBlobStoreDefaultEncryptionKeyAlias());
123+
BlobDataSource ds = bs.read(blobUri);
106124

107125
// Verify that suffix matches
108126
assertThat(ds.isCompressed(), equalTo(true));
@@ -112,29 +130,25 @@ public void testBlobStorageWithEcnryptionAndCompression() throws IOException, Ge
112130
assertThat(compressedSize, lessThan(origSize));
113131

114132
// Read blob back again (can't reuse same InputStream)
115-
ds = bs.read(blobUri, Configurator.getBlobStoreDefaultEncryptionKeyAlias());
133+
ds = bs.read(blobUri);
116134
long newSize = IOUtils.getInputStreamSize(ds.getUncompressedInputStream());
117135

118136
// Check Blob size
119137
assertThat(newSize, equalTo(origSize));
120138

121139
// Delete
122-
testDelete();
140+
bs.delete(blobUri);
123141
}
124142

125-
private long testWrite(BlobStorage bs) throws IOException, GeneralSecurityException
143+
private long testWrite(AbstractBlobStorage bs) throws IOException, GeneralSecurityException
126144
{
127145
File file = new File(TEST_FILE);
128146
InputStream in = new FileInputStream(file);
129147

130-
blobUri = bs.write(TEMP_BLOB, Configurator.getBlobStoreWriteProfileName(), in, file.length());
148+
blobUri = bs.write(MESSAGE_ID, MAILBOX, Configurator.getBlobStoreWriteProfileName(), in, file.length());
131149
in.close();
132150

133151
return file.length();
134152
}
135153

136-
private void testDelete()
137-
{
138-
BlobStoreProxy.delete(blobUri);
139-
}
140154
}

0 commit comments

Comments
 (0)
This repository has been archived.