Skip to content

Commit

Permalink
SAMZA-889: Change log not working properly with In memory Store
Browse files Browse the repository at this point in the history
  • Loading branch information
Navina Ramesh authored and nickpan47 committed Jun 22, 2016
1 parent 94e4e39 commit 38e81c0
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 915 deletions.
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@

<allow class="org.apache.samza.SamzaException" />
<allow class="org.apache.samza.Partition" />

<subpackage name="kv">
<allow pkg="org.apache.samza.storage" />
</subpackage>
</subpackage>

<subpackage name="logging">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,8 @@ public interface StorageEngine {
*/
void stop();

/**
* Get store properties
*/
StoreProperties getStoreProperties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ public interface StorageEngineFactory<K, V> {
* @param containerContext Information about the container in which the task is executing.
* @return The storage engine instance.
*/
public StorageEngine getStorageEngine(
String storeName,
File storeDir,
Serde<K> keySerde,
Serde<V> msgSerde,
MessageCollector collector,
MetricsRegistry registry,
SystemStreamPartition changeLogSystemStreamPartition,
SamzaContainerContext containerContext);
StorageEngine getStorageEngine(
String storeName,
File storeDir,
Serde<K> keySerde,
Serde<V> msgSerde,
MessageCollector collector,
MetricsRegistry registry,
SystemStreamPartition changeLogSystemStreamPartition,
SamzaContainerContext containerContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage;

/**
* Immutable class that defines the properties of a Store
*/
public class StoreProperties {
private final boolean persistedToDisk;
private final boolean loggedStore;

private StoreProperties(
final boolean persistedToDisk,
final boolean loggedStore) {
this.persistedToDisk = persistedToDisk;
this.loggedStore = loggedStore;
}

/**
* Flag to indicate whether a store can be persisted to disk or not
*
* @return True, if store can be flushed to disk. False, by default.
*/
public boolean isPersistedToDisk() {
return persistedToDisk;
}

/**
* Flag to indicate whether a store is associated with a changelog (used for recovery) or not
*
* @return True, if changelog is enabled. False, by default.
*/
public boolean isLoggedStore() {
return loggedStore;
}

public static class StorePropertiesBuilder {
private boolean persistedToDisk = false;
private boolean loggedStore = false;

public StorePropertiesBuilder setPersistedToDisk(boolean persistedToDisk) {
this.persistedToDisk = persistedToDisk;
return this;
}

public StorePropertiesBuilder setLoggedStore(boolean loggedStore) {
this.loggedStore = loggedStore;
return this;
}

public StoreProperties build() {
return new StoreProperties(persistedToDisk, loggedStore);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ package org.apache.samza.container
import java.io.File
import java.nio.file.Path
import java.util
import java.lang.Thread.UncaughtExceptionHandler
import java.net.{URL, UnknownHostException}
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.MetricsConfig.Config2Metrics
import org.apache.samza.config.SerializerConfig.Config2Serializer
import org.apache.samza.config.ShellCommandConfig
Expand All @@ -32,38 +35,18 @@ import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
import org.apache.samza.container.disk.WatermarkDiskQuotaPolicy.Entry
import org.apache.samza.container.disk.{NoThrottlingDiskQuotaPolicyFactory, DiskQuotaPolicyFactory, NoThrottlingDiskQuotaPolicy, WatermarkDiskQuotaPolicy, PollingScanDiskSpaceMonitor, DiskSpaceMonitor}
import org.apache.samza.container.disk.{NoThrottlingDiskQuotaPolicyFactory, DiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor, DiskSpaceMonitor}
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
import org.apache.samza.metrics.JmxServer
import org.apache.samza.metrics.JvmMetrics
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.metrics.MetricsReporter
import org.apache.samza.metrics.MetricsReporterFactory
import org.apache.samza.serializers.SerdeFactory
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.storage.StorageEngineFactory
import org.apache.samza.storage.TaskStorageManager
import org.apache.samza.system.StreamMetadataCache
import org.apache.samza.system.SystemConsumers
import org.apache.samza.system.SystemConsumersMetrics
import org.apache.samza.system.SystemFactory
import org.apache.samza.system.SystemProducers
import org.apache.samza.system.SystemProducersMetrics
import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.chooser.DefaultChooser
import org.apache.samza.system.chooser.MessageChooserFactory
import org.apache.samza.system.chooser.RoundRobinChooserFactory
import org.apache.samza.task.StreamTask
import org.apache.samza.task.TaskInstanceCollector
import org.apache.samza.job.model.{ContainerModel, JobModel}
import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter, MetricsReporterFactory}
import org.apache.samza.serializers.{SerdeFactory, SerdeManager}
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.storage.{StorageEngineFactory, TaskStorageManager}
import org.apache.samza.system.{StreamMetadataCache, SystemConsumers, SystemConsumersMetrics, SystemFactory, SystemProducers, SystemProducersMetrics, SystemStream, SystemStreamPartition}
import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory}
import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
import org.apache.samza.util.{ThrottlingExecutor, ExponentialSleepStrategy, Logging, Util}
import scala.collection.JavaConversions._
import java.net.{UnknownHostException, InetAddress, URL}
import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.config.JobConfig.Config2Job
import java.lang.Thread.UncaughtExceptionHandler

object SamzaContainer extends Logging {
val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
Expand Down
Loading

0 comments on commit 38e81c0

Please sign in to comment.