Skip to content

Commit

Permalink
Fix synchronization issue in OpenTSDBMetricWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
tbadie authored and philwebb committed Sep 24, 2015
1 parent c629813 commit 2fd1cac
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -43,6 +44,7 @@
* task to flush periodically.
*
* @author Dave Syer
* @author Thomas Badie
* @since 1.3.0
*/
public class OpenTsdbMetricWriter implements MetricWriter {
Expand All @@ -67,7 +69,7 @@ public class OpenTsdbMetricWriter implements MetricWriter {
*/
private MediaType mediaType = MediaType.APPLICATION_JSON;

private List<OpenTsdbData> buffer = new ArrayList<OpenTsdbData>(this.bufferSize);
private final List<OpenTsdbData> buffer = new ArrayList<OpenTsdbData>(this.bufferSize);

private OpenTsdbNamingStrategy namingStrategy = new DefaultOpenTsdbNamingStrategy();

Expand Down Expand Up @@ -105,35 +107,42 @@ public void set(Metric<?> value) {
OpenTsdbData data = new OpenTsdbData(
this.namingStrategy.getName(value.getName()), value.getValue(), value
.getTimestamp().getTime());
this.buffer.add(data);
if (this.buffer.size() >= this.bufferSize) {
flush();
synchronized (this.buffer) {
this.buffer.add(data);
if (this.buffer.size() >= this.bufferSize) {
flush();
}
}
}

/**
* Flush the buffer without waiting for it to fill any further.
*/
@SuppressWarnings("rawtypes")
public void flush() {
if (this.buffer.isEmpty()) {
List<OpenTsdbData> snapshot = getBufferSnapshot();
if (snapshot.isEmpty()) {
return;
}
List<OpenTsdbData> temp = new ArrayList<OpenTsdbData>();
synchronized (this.buffer) {
temp.addAll(this.buffer);
this.buffer.clear();
}
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Arrays.asList(this.mediaType));
headers.setContentType(this.mediaType);
HttpEntity<List<OpenTsdbData>> request = new HttpEntity<List<OpenTsdbData>>(temp,
headers);
@SuppressWarnings("rawtypes")
ResponseEntity<Map> response = this.restTemplate.postForEntity(this.url, request,
Map.class);
ResponseEntity<Map> response = this.restTemplate.postForEntity(this.url,
new HttpEntity<List<OpenTsdbData>>(snapshot, headers), Map.class);
if (!response.getStatusCode().is2xxSuccessful()) {
logger.warn("Cannot write metrics (discarded " + temp.size() + " values): "
+ response.getBody());
logger.warn("Cannot write metrics (discarded " + snapshot.size()
+ " values): " + response.getBody());
}
}

private List<OpenTsdbData> getBufferSnapshot() {
synchronized (this.buffer) {
if (this.buffer.isEmpty()) {
return Collections.emptyList();
}
List<OpenTsdbData> snapshot = new ArrayList<OpenTsdbData>(this.buffer);
this.buffer.clear();
return snapshot;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
public class OpenTsdbMetricWriterTests {

private OpenTsdbMetricWriter writer;

private RestOperations restTemplate = Mockito.mock(RestOperations.class);

@Before
Expand Down

0 comments on commit 2fd1cac

Please sign in to comment.