package com.linkedin.datahub.upgrade.system.elasticsearch.steps;

import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.mxe.DataHubUpgradeHistoryEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Function;


@RequiredArgsConstructor
@Slf4j
public class DataHubStartupStep implements UpgradeStep {
  private final KafkaEventProducer _kafkaEventProducer;
  private final String _version;

  @Override
  public String id() {
    return "DataHubStartupStep";
  }

  @Override
  public int retryCount() {
    return 3;
  }

  @Override
  public Function<UpgradeContext, UpgradeStepResult> executable() {
    return (context) -> {
      try {
        DataHubUpgradeHistoryEvent dataHubUpgradeHistoryEvent = new DataHubUpgradeHistoryEvent()
            .setVersion(_version);
        _kafkaEventProducer.produceDataHubUpgradeHistoryEvent(dataHubUpgradeHistoryEvent);
        log.info("Initiating startup for version: {}", _version);
      } catch (Exception e) {
        log.error("DataHubStartupStep failed.", e);
        return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
      }
      return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
    };
  }
}
