Skip to content

Commit

Permalink
Default statistics backward compatible change
Browse files Browse the repository at this point in the history
In the PR where we changed the repartitioning endpoint we also
accidentally introduced a change in an undocumented behaviour that
some clients relied on. It used to be possible to update an event type
without providing default statistics.

We believe this behaviour should not be modified because this field is
optional, so some clients might not even support it (Nakadi UI, for
example).

We are also fixing a problem where repartition could be triggered
concurrently with an event type update resulting in some race
condition on database and timeline changes in database.
  • Loading branch information
rcillo committed Aug 28, 2020
1 parent a3e273e commit 51897f6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.EventTypeBase;
import org.zalando.nakadi.domain.EventTypeOptions;
import org.zalando.nakadi.domain.EventTypeStatistics;
import org.zalando.nakadi.domain.Feature;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.Timeline;
Expand Down Expand Up @@ -440,7 +439,7 @@ public void update(final String eventTypeName,
validateAudience(original, eventTypeBase);
partitionResolver.validate(eventTypeBase);
eventType = schemaEvolutionService.evolve(original, eventTypeBase);
validateStatisticsUpdate(original.getDefaultStatistic(), eventType.getDefaultStatistic());
validateStatisticsUpdate(original, eventType);
updateRetentionTime(original, eventType);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -561,9 +560,11 @@ private Multimap<TopicRepository, String> deleteEventType(final String eventType
}

private void validateStatisticsUpdate(
final EventTypeStatistics existing,
final EventTypeStatistics newStatistics) throws InvalidEventTypeException {
if (!Objects.equals(existing, newStatistics)) {
final EventType originalEventType,
final EventTypeBase newEventType) throws InvalidEventTypeException {
if (newEventType.getDefaultStatistic() == null) {
newEventType.setDefaultStatistic(originalEventType.getDefaultStatistic());
} else if (!Objects.equals(originalEventType.getDefaultStatistic(), newEventType.getDefaultStatistic())) {
throw new InvalidEventTypeException("default statistics must not be changed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,29 @@
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import org.zalando.nakadi.exceptions.runtime.EventTypeDeletionException;
import org.zalando.nakadi.exceptions.runtime.EventTypeUnavailableException;
import org.zalando.nakadi.exceptions.runtime.InternalNakadiException;
import org.zalando.nakadi.exceptions.runtime.InvalidEventTypeException;
import org.zalando.nakadi.exceptions.runtime.NakadiBaseException;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.repository.db.EventTypeRepository;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.service.subscription.LogPathBuilder;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.service.timeline.TimelineSync;
import org.zalando.nakadi.view.Cursor;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

@Service
Expand All @@ -46,6 +53,7 @@ public class RepartitioningService {
private final CursorConverter cursorConverter;
private final FeatureToggleService featureToggleService;
private final EventTypeCache eventTypeCache;
private final TimelineSync timelineSync;

@Autowired
public RepartitioningService(
Expand All @@ -57,7 +65,8 @@ public RepartitioningService(
final NakadiSettings nakadiSettings,
final CursorConverter cursorConverter,
final FeatureToggleService featureToggleService,
final EventTypeCache eventTypeCache) {
final EventTypeCache eventTypeCache,
final TimelineSync timelineSync) {
this.eventTypeRepository = eventTypeRepository;
this.timelineService = timelineService;
this.subscriptionRepository = subscriptionRepository;
Expand All @@ -67,6 +76,7 @@ public RepartitioningService(
this.cursorConverter = cursorConverter;
this.featureToggleService = featureToggleService;
this.eventTypeCache = eventTypeCache;
this.timelineSync = timelineSync;
}

public void repartition(final EventType eventType, final int partitions)
Expand All @@ -89,18 +99,44 @@ public void repartition(final EventType eventType, final int partitions)
}

LOG.info("Start repartitioning for {} to {} partitions", eventType.getName(), partitions);
timelineService.updateTimeLineForRepartition(eventType, partitions);

updateSubscriptionsForRepartitioning(eventType, partitions);

// it is clear that the operation has to be done under the lock with other related work for changing event type,
// but it is skipped, because it is quite rare operation to change event type and repartition at the same time
Closeable closeable = null;
try {
defaultStatistic.setReadParallelism(partitions);
defaultStatistic.setWriteParallelism(partitions);
eventTypeRepository.update(eventType);
} catch (Exception e) {
throw new NakadiBaseException(e.getMessage(), e);
closeable = timelineSync.workWithEventType(eventType.getName(), nakadiSettings.getTimelineWaitTimeoutMs());
timelineService.updateTimeLineForRepartition(eventType, partitions);

updateSubscriptionsForRepartitioning(eventType, partitions);

// it is clear that the operation has to be done under the lock with other related work for changing event
// type, but it is skipped, because it is quite rare operation to change event type and repartition at the
// same time
try {
defaultStatistic.setReadParallelism(partitions);
defaultStatistic.setWriteParallelism(partitions);
eventTypeRepository.update(eventType);
} catch (Exception e) {
throw new NakadiBaseException(e.getMessage(), e);
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Failed to wait for timeline switch", e);
throw new EventTypeUnavailableException("Event type " + eventType.getName()
+ " is currently in maintenance, please repeat request");
} catch (final TimeoutException e) {
LOG.error("Failed to wait for timeline switch", e);
throw new EventTypeUnavailableException("Event type " + eventType.getName()
+ " is currently in maintenance, please repeat request");
} catch (final InternalNakadiException | ServiceTemporarilyUnavailableException e) {
LOG.error("Error deleting event type " + eventType.getName(), e);
throw new EventTypeDeletionException("Failed to repartition event type " + eventType.getName());
} finally {
try {
if (closeable != null) {
closeable.close();
}
} catch (final IOException e) {
LOG.error("Exception occurred when releasing usage of event-type", e);
}
}
}

Expand Down

0 comments on commit 51897f6

Please sign in to comment.