diff --git a/CHANGELOG.md b/CHANGELOG.md index 595701b9..a42a165f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,12 @@ All notable changes to this project will be documented in this file. - Set `nifi.content.repository.archive.max.retention.period` to `3 days` (previously empty, which NiFi interprets as `Long.MAX_VALUE` and effectively disables time-based archive purge). Without a time-based ceiling, the content archive can grow to half the content PVC and accumulate millions of files, which makes the synchronous startup directory scan in `FileSystemRepository.initializeRepository` very slow. Users requiring a longer content-replay window can extend via `configOverrides`. The provenance audit trail is independent of this setting and unaffected ([#936]). - test: Bump vector-aggregator to 0.55.0, replace /graphql call with gRPC call ([#940]). +### Removed + +- BREAKING: Remove support for NiFi 1.x. + This removes the Prometheus reporting-task Job (and its `spec.clusterConfig.createReportingTaskJob` field), the pre-2.x non-rolling upgrade handling, the dedicated metrics port, and the sensitive-properties algorithms that were only supported on NiFi 1.x. + `status.deployed_version` is retained even though it no longer drives the (now removed) non-rolling upgrade state machine, as we don't want a breaking change to the status just for this ([#954]). + ### Fixed - Fix broken link to the NiFi authorization usage guide in the `spec.clusterConfig.authorization` CRD doc (`usage-guide` -> `usage_guide`) ([#924]). @@ -34,6 +40,7 @@ All notable changes to this project will be documented in this file. [#935]: https://github.com/stackabletech/nifi-operator/pull/935 [#936]: https://github.com/stackabletech/nifi-operator/pull/936 [#940]: https://github.com/stackabletech/nifi-operator/pull/940 +[#954]: https://github.com/stackabletech/nifi-operator/pull/954 ## [26.3.0] - 2026-03-16 diff --git a/deploy/helm/nifi-operator/templates/clusterrole-operator.yaml b/deploy/helm/nifi-operator/templates/clusterrole-operator.yaml index 38ecb56d..845b4ed5 100644 --- a/deploy/helm/nifi-operator/templates/clusterrole-operator.yaml +++ b/deploy/helm/nifi-operator/templates/clusterrole-operator.yaml @@ -81,17 +81,6 @@ rules: - list - patch - watch - # Optional reporting-task Job (NiFi 1.x only). Applied via SSA and tracked for orphan cleanup. - - apiGroups: - - batch - resources: - - jobs - verbs: - - create - - delete - - get - - list - - patch # PodDisruptionBudget created per role. Applied via SSA and tracked for orphan cleanup. - apiGroups: - policy diff --git a/docs/modules/nifi/pages/index.adoc b/docs/modules/nifi/pages/index.adoc index ead76339..f878e102 100644 --- a/docs/modules/nifi/pages/index.adoc +++ b/docs/modules/nifi/pages/index.adoc @@ -35,7 +35,8 @@ Every role group is accessible through it's own Service, and there is a Service == Dependencies -Apache NiFi 1.x depends on Apache ZooKeeper which you can run in Kubernetes with the xref:zookeeper:index.adoc[]. +NiFi builds its cluster quorum using Kubernetes by default and needs no additional dependencies. +It can optionally use Apache ZooKeeper instead, which you can run in Kubernetes with the xref:zookeeper:index.adoc[]. == [[demos]]Demos diff --git a/docs/modules/nifi/pages/usage_guide/clustering.adoc b/docs/modules/nifi/pages/usage_guide/clustering.adoc index 417c9f9b..e0485bfa 100644 --- a/docs/modules/nifi/pages/usage_guide/clustering.adoc +++ b/docs/modules/nifi/pages/usage_guide/clustering.adoc @@ -14,8 +14,6 @@ CAUTION: The cluster backend of an existing cluster should never be changed. Oth [#backend-kubernetes] == Kubernetes -NOTE: The Kubernetes provider is only supported by Apache NiFi 2.0 or newer. When using NiFi 1.x, use the xref:#backend-zookeeper[] backend instead. - The Kubernetes backend is used by default (unless the xref:#backend-zookeeper[] backend is configured), and stores all state in Kubernetes objects, in the same namespace as the `NifiCluster` object. It takes no configuration. diff --git a/docs/modules/nifi/pages/usage_guide/monitoring.adoc b/docs/modules/nifi/pages/usage_guide/monitoring.adoc index 3a093f87..a7474b2d 100644 --- a/docs/modules/nifi/pages/usage_guide/monitoring.adoc +++ b/docs/modules/nifi/pages/usage_guide/monitoring.adoc @@ -1,39 +1,8 @@ = Monitoring :description: The Stackable Operator for Apache NiFi automatically configures NiFi to export Prometheus metrics. -:k8s-job: https://kubernetes.io/docs/concepts/workloads/controllers/job/ -:k8s-network-policies: https://kubernetes.io/docs/concepts/services-networking/network-policies/ :prometheus-operator: https://prometheus-operator.dev/ -In November 2024, Apache NiFi released a new major version https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version2.0.0[`2.0.0`,window=_blank]. - -The NiFi `2.0.0` release changed the way of exposing Prometheus metrics significantly. -The following steps explain on how to expose Metrics in NiFi versions `1.x.x` and `2.x.x`. - -== Configure metrics in NiFi `1.x.x` - -For NiFi versions `1.x.x`, the operator automatically configures NiFi to export Prometheus metrics. -This is done by creating a {k8s-job}[Job,window=_blank] that connects to NiFi and configures a https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-prometheus-nar/1.26.0/org.apache.nifi.reporting.prometheus.PrometheusReportingTask/index.html[Prometheus Reporting Task,window=_blank]. - -IMPORTANT: Network access from the Job to NiFi is required. -If you are running a Kubernetes with restrictive {k8s-network-policies}[NetworkPolicies,window=_blank], make sure to allow access from the Job to NiFi. - -See xref:operators:monitoring.adoc[] for more details. - -== Disabling create-reporting-task Job - -It can be helpful to disable the Job, e.g. when you configOverride an authentication mechanism, which the Job currently cannot use to authenticate against NiFi. - -To achieve this use the following configuration: - -[source,yaml] ----- -spec: - clusterConfig: - createReportingTaskJob: - enabled: false ----- - -== Configure metrics in NiFi `2.x.x` +== Configure metrics The Prometheus Reporting Task was removed in NiFi `2.x.x` in https://issues.apache.org/jira/browse/NIFI-13507[NIFI-13507,window=_blank]. Metrics are now always exposed and can be scraped using the NiFi `metrics` Service and the HTTP path `/nifi-api/flow/metrics/prometheus`. diff --git a/docs/modules/nifi/pages/usage_guide/overrides.adoc b/docs/modules/nifi/pages/usage_guide/overrides.adoc index 32c0857b..fb4c32e8 100644 --- a/docs/modules/nifi/pages/usage_guide/overrides.adoc +++ b/docs/modules/nifi/pages/usage_guide/overrides.adoc @@ -96,25 +96,6 @@ Pod overrides allow you to configure any attributes that can be configured on a Read the xref:concepts:overrides.adoc#pod-overrides[Pod overrides concepts page] to learn more. -=== Pod overrides on create-reporting-task Job - -In addition to podOverrides on the created StatefulSet we also support podOverrides on the created Kubernetes Job, which enables the export of Prometheus metrics within NiFi. - -[source,yaml] ----- -spec: - clusterConfig: - createReportingTaskJob: - # enabled: false # You can also turn off the Job entirely - podOverrides: # podOverrides as usual - spec: - tolerations: - - key: "key1" - operator: "Equal" - value: "value1" - effect: "NoSchedule" ----- - == JVM argument overrides Stackable operators automatically determine the set of needed JVM arguments, such as memory settings or trust- and keystores. diff --git a/docs/modules/nifi/pages/usage_guide/updating.adoc b/docs/modules/nifi/pages/usage_guide/updating.adoc index f290add4..4df8ef62 100644 --- a/docs/modules/nifi/pages/usage_guide/updating.adoc +++ b/docs/modules/nifi/pages/usage_guide/updating.adoc @@ -17,16 +17,4 @@ spec: <1> Change the NiFi version here -[WARNING] -==== -NiFi clusters cannot be upgraded or downgraded in a rolling fashion due to a limitation in NiFi prior to version 2. - -When upgrading between NiFi 1 versions or from NiFi 1 to NiFi 2, any change to the NiFi version in the CRD triggers a full cluster restart with brief downtime. -However, the Stackable image version can be updated in a rolling manner, provided the NiFi version remains unchanged. - -Since NiFi version 2, rolling upgrades are supported. -==== - -== NiFi 2.0.0 - -Before you can upgrade to `2.0.0` you https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance[need to update] to at least version 1.27.x! +Rolling upgrades are supported, so changing the NiFi version in the CRD updates the cluster without downtime. diff --git a/extra/crds.yaml b/extra/crds.yaml index b7200414..eec97136 100644 --- a/extra/crds.yaml +++ b/extra/crds.yaml @@ -139,33 +139,6 @@ spec: - accessPolicyProvider type: object type: object - createReportingTaskJob: - default: - enabled: true - podOverrides: {} - description: |- - This section creates a `create-reporting-task` Kubernetes Job, which enables the export of - Prometheus metrics within NiFi. - properties: - enabled: - default: true - description: |- - Whether the Kubernetes Job should be created, defaults to true. It can be helpful to disable - the Job, e.g. when you configOverride an authentication mechanism, which the Job currently - can't use to authenticate against NiFi. - type: boolean - podOverrides: - default: {} - description: |- - Here you can define a - [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) - to override any property that can be set on the Pod of the create-reporting-task Kubernetes Job. - Read the - [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) - for more information. - type: object - x-kubernetes-preserve-unknown-fields: true - type: object customComponentsGitSync: default: [] description: |- @@ -347,26 +320,11 @@ spec: `nifiPbkdf2AesGcm256` (the default value), `nifiArgon2AesGcm256`, - The following algorithms are deprecated and will be removed in future versions: - - `nifiArgon2AesGcm128`, - `nifiBcryptAesGcm128`, - `nifiBcryptAesGcm256`, - `nifiPbkdf2AesGcm128`, - `nifiScryptAesGcm128`, - `nifiScryptAesGcm256`. - Learn more about the specifics of the algorithm parameters in the [NiFi documentation](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#property-encryption-algorithms). enum: - nifiPbkdf2AesGcm256 - nifiArgon2AesGcm256 - - nifiBcryptAesGcm128 - - nifiBcryptAesGcm256 - - nifiPbkdf2AesGcm128 - - nifiArgon2AesGcm128 - - nifiScryptAesGcm128 - - nifiScryptAesGcm256 - null nullable: true type: string @@ -413,8 +371,7 @@ spec: When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. - The Kubernetes provider will be used if this field is unset. Kubernetes is only supported for NiFi 2.x and newer, - NiFi 1.x requires ZooKeeper. + The Kubernetes provider will be used if this field is unset. type: string required: - authentication diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index 5e4d1945..ac87b8b2 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -5,7 +5,7 @@ use std::{ use jvm::build_merged_jvm_config; use product_config::{ProductConfigManager, types::PropertyNameKind}; -use snafu::{ResultExt, Snafu, ensure}; +use snafu::{ResultExt, Snafu}; use stackable_operator::{ commons::resources::Resources, crd::git_sync, @@ -20,7 +20,6 @@ use strum::{Display, EnumIter}; use crate::{ crd::{ HTTPS_PORT, NifiConfig, NifiRole, NifiRoleType, NifiStorageConfig, PROTOCOL_PORT, - sensitive_properties, v1alpha1::{self, NifiClusteringBackend}, }, operations::graceful_shutdown::graceful_shutdown_config_properties, @@ -101,14 +100,6 @@ pub enum Error { #[snafu(display("failed to generate OIDC config"))] GenerateOidcConfig { source: oidc::Error }, - - #[snafu(display( - "NiFi 1.x requires ZooKeeper (hint: upgrade to NiFi 2.x or set .spec.clusterConfig.zookeeperConfigMapName)" - ))] - Nifi1RequiresZookeeper, - - #[snafu(display("failed to configure sensitive properties"))] - ConfigureSensitiveProperties { source: sensitive_properties::Error }, } /// Create the NiFi bootstrap.conf @@ -156,35 +147,14 @@ pub fn build_nifi_properties( proxy_hosts: &str, auth_config: &NifiAuthenticationConfig, overrides: BTreeMap, - product_version: &str, git_sync_resources: &git_sync::v1alpha2::GitSyncResources, ) -> Result { - // TODO: Remove once we dropped support for all NiFi 1.x versions - let is_nifi_1 = product_version.starts_with("1."); - let mut properties = BTreeMap::new(); // Core Properties - // According to https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance#MigrationGuidance-Migratingto2.0.0-M1 - // The nifi.flow.configuration.file property in nifi.properties must be changed to reference - // "flow.json.gz" instead of "flow.xml.gz" - // TODO: Remove once we dropped support for all 1.x.x versions - // TODO(malte): In order to use CLI tools like: ./bin/nifi.sh set-sensitive-properties-algorithm NIFI_PBKDF2_AES_GCM_256 - // we have to set both "nifi.flow.configuration.file" and "nifi.flow.configuration.json.file" in NiFi 1.x.x. - if is_nifi_1 { - properties.insert( - "nifi.flow.configuration.file".to_string(), - NifiRepository::Database.mount_path() + "/flow.xml.gz", - ); - properties.insert( - "nifi.flow.configuration.json.file".to_string(), - NifiRepository::Database.mount_path() + "/flow.json.gz", - ); - } else { - properties.insert( - "nifi.flow.configuration.file".to_string(), - NifiRepository::Database.mount_path() + "/flow.json.gz", - ); - } + properties.insert( + "nifi.flow.configuration.file".to_string(), + NifiRepository::Database.mount_path() + "/flow.json.gz", + ); properties.insert( "nifi.flow.configuration.archive.enabled".to_string(), @@ -535,10 +505,6 @@ pub fn build_nifi_properties( .clone() .unwrap_or_default(); - sensitive_properties_algorithm - .check_for_nifi_version(spec.image.product_version()) - .context(ConfigureSensitivePropertiesSnafu)?; - properties.insert( "nifi.sensitive.props.algorithm".to_string(), sensitive_properties_algorithm.to_string(), @@ -635,8 +601,6 @@ pub fn build_nifi_properties( } v1alpha1::NifiClusteringBackend::Kubernetes {} => { - ensure!(!is_nifi_1, Nifi1RequiresZookeeperSnafu); - properties.insert( "nifi.cluster.leader.election.implementation".to_string(), "KubernetesLeaderElectionManager".to_string(), @@ -653,9 +617,6 @@ pub fn build_nifi_properties( //#################### // Custom components # //#################### - // NiFi 1.x does not support Python components and the Python configuration below is just - // ignored. - // The command used to launch Python. // This property must be set to enable Python-based processors. properties.insert("nifi.python.command".to_string(), "python3".to_string()); diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index a2b0cd52..bd321d4a 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -84,21 +84,16 @@ use crate::{ }, crd::{ APP_NAME, BALANCE_PORT, BALANCE_PORT_NAME, Container, HTTPS_PORT, HTTPS_PORT_NAME, - METRICS_PORT, METRICS_PORT_NAME, NifiConfig, NifiNodeRoleConfig, NifiRole, NifiRoleType, - NifiStatus, PROTOCOL_PORT, PROTOCOL_PORT_NAME, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, + NifiConfig, NifiNodeRoleConfig, NifiRole, NifiRoleType, NifiStatus, PROTOCOL_PORT, + PROTOCOL_PORT_NAME, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, authorization::NifiAccessPolicyProvider, v1alpha1, }, listener::{ LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_group_listener, build_group_listener_pvc, group_listener_name, }, - operations::{ - graceful_shutdown::add_graceful_shutdown_config, - pdb::add_pdbs, - upgrade::{self, ClusterVersionUpdateState}, - }, + operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, product_logging::extend_role_group_config_map, - reporting_task::{build_maybe_reporting_task, build_reporting_task_service_name}, security::{ authentication::{ AUTHORIZERS_XML_FILE_NAME, LOGIN_IDENTITY_PROVIDERS_XML_FILE_NAME, @@ -189,16 +184,6 @@ pub enum Error { rolegroup: RoleGroupRef, }, - #[snafu(display("failed to apply create ReportingTask service"))] - ApplyCreateReportingTaskService { - source: stackable_operator::cluster_resources::Error, - }, - - #[snafu(display("failed to apply create ReportingTask job"))] - ApplyCreateReportingTaskJob { - source: stackable_operator::cluster_resources::Error, - }, - #[snafu(display("object is missing metadata to build owner reference"))] ObjectMissingMetadataForOwnerRef { source: stackable_operator::builder::meta::Error, @@ -309,11 +294,6 @@ pub enum Error { #[snafu(display("security failure"))] Security { source: crate::security::Error }, - #[snafu(display("reporting task failure"))] - ReportingTask { - source: crate::reporting_task::Error, - }, - #[snafu(display("failed to configure logging"))] ConfigureLogging { source: LoggingError }, @@ -325,9 +305,6 @@ pub enum Error { source: builder::pod::container::Error, }, - #[snafu(display("Failed to determine the state of the version upgrade procedure"))] - ClusterVersionUpdateState { source: upgrade::Error }, - #[snafu(display("failed to apply group listener"))] ApplyGroupListener { source: stackable_operator::cluster_resources::Error, @@ -375,7 +352,6 @@ pub async fn reconcile_nifi( &dereferenced_objects, &ctx.operator_environment, &ctx.product_config, - &client.kubernetes_cluster_info, ) .context(ValidateClusterSnafu)?; @@ -390,34 +366,6 @@ pub async fn reconcile_nifi( .await .context(SecuritySnafu)?; - // If rolling upgrade is supported, kubernetes takes care of the cluster scaling automatically - // otherwise the operator handles it - // manage our own flow for upgrade from 1.x.x to 1.x.x/2.x.x - // TODO: this can be removed once 1.x.x is longer supported - let mut cluster_version_update_state = ClusterVersionUpdateState::NoVersionChange; - let deployed_version = nifi - .status - .as_ref() - .and_then(|status| status.deployed_version.as_ref()); - let rolling_upgrade_supported = resolved_product_image.product_version.starts_with("2.") - && deployed_version.is_some_and(|v| v.starts_with("2.")); - - if !rolling_upgrade_supported { - cluster_version_update_state = upgrade::cluster_version_update_state( - nifi, - client, - &resolved_product_image.product_version, - deployed_version, - ) - .await - .context(ClusterVersionUpdateStateSnafu)?; - - if cluster_version_update_state == ClusterVersionUpdateState::UpdateInProgress { - return Ok(Action::await_change()); - } - } - // end todo - let mut cluster_resources = ClusterResources::new( APP_NAME, OPERATOR_NAME, @@ -521,12 +469,7 @@ pub async fn reconcile_nifi( .await?; let role_group = role.role_groups.get(&rolegroup.role_group); - let replicas = - if cluster_version_update_state == ClusterVersionUpdateState::UpdateRequested { - Some(0) - } else { - role_group.and_then(|rg| rg.replicas).map(i32::from) - }; + let replicas = role_group.and_then(|rg| rg.replicas).map(i32::from); let rg_statefulset = build_node_rolegroup_statefulset( nifi, @@ -538,7 +481,6 @@ pub async fn reconcile_nifi( &merged_config, &authentication_config, &authorization_config, - rolling_upgrade_supported, replicas, &rbac_sa.name_any(), &git_sync_resources, @@ -550,7 +492,6 @@ pub async fn reconcile_nifi( &rolegroup, role_group_service_recommended_labels, role_group_service_selector.into(), - &resolved_product_image.product_version, ) .context(ServiceConfigurationSnafu)?; @@ -624,29 +565,6 @@ pub async fn reconcile_nifi( .context(ApplyGroupListenerSnafu)?; } - // Only add the reporting task in case it is enabled. - if nifi.spec.cluster_config.create_reporting_task_job.enabled { - if let Some((reporting_task_job, reporting_task_service)) = build_maybe_reporting_task( - nifi, - &resolved_product_image, - &client.kubernetes_cluster_info, - &authentication_config, - &rbac_sa.name_any(), - ) - .context(ReportingTaskSnafu)? - { - cluster_resources - .add(client, reporting_task_service) - .await - .context(ApplyCreateReportingTaskServiceSnafu)?; - - cluster_resources - .add(client, reporting_task_job) - .await - .context(ApplyCreateReportingTaskJobSnafu)?; - } - } - // Remove any orphaned resources that still exist in k8s, but have not been added to // the cluster resources during the reconciliation // TODO: this doesn't cater for a graceful cluster shrink, for that we'd need to predict @@ -664,19 +582,9 @@ pub async fn reconcile_nifi( // Update the deployed product version in the status after everything has been deployed, unless // we are still in the process of updating - let status = if cluster_version_update_state != ClusterVersionUpdateState::UpdateRequested { - NifiStatus { - deployed_version: Some(resolved_product_image.product_version), - conditions, - } - } else { - NifiStatus { - deployed_version: nifi - .status - .as_ref() - .and_then(|status| status.deployed_version.clone()), - conditions, - } + let status = NifiStatus { + deployed_version: Some(resolved_product_image.product_version), + conditions, }; client @@ -766,7 +674,6 @@ async fn build_node_rolegroup_config_map( kind: NIFI_PROPERTIES.to_string(), })? .clone(), - resolved_product_image.product_version.as_ref(), git_sync_resources, ) .with_context(|_| BuildProductConfigSnafu { @@ -821,7 +728,6 @@ async fn build_node_rolegroup_statefulset( merged_config: &NifiConfig, authentication_config: &NifiAuthenticationConfig, authorization_config: &ResolvedNifiAuthorizationConfig, - rolling_update_supported: bool, replicas: Option, service_account_name: &str, git_sync_resources: &git_sync::v1alpha2::GitSyncResources, @@ -1148,11 +1054,6 @@ async fn build_node_rolegroup_statefulset( }) .resources(merged_config.resources.clone().into()); - // NiFi 2.x.x offers nifi-api/flow/metrics/prometheus at the HTTPS_PORT, therefore METRICS_PORT is only required for NiFi 1.x.x. - if resolved_product_image.product_version.starts_with("1.") { - container_nifi.add_container_port(METRICS_PORT_NAME, METRICS_PORT.into()); - } - let mut pod_builder = PodBuilder::new(); let recommended_object_labels = build_recommended_labels( @@ -1288,7 +1189,6 @@ async fn build_node_rolegroup_statefulset( let requested_secret_lifetime = merged_config .requested_secret_lifetime .context(MissingSecretLifetimeSnafu)?; - let nifi_cluster_name = nifi.name_any(); pod_builder .metadata(metadata) .image_pull_secrets_from_product_image(resolved_product_image) @@ -1331,10 +1231,7 @@ async fn build_node_rolegroup_statefulset( build_tls_volume( nifi, KEYSTORE_VOLUME_NAME, - [ - rolegroup_ref.rolegroup_metrics_service_name(), - build_reporting_task_service_name(&nifi_cluster_name), - ], + [rolegroup_ref.rolegroup_metrics_service_name()], SecretFormat::TlsPkcs12, &requested_secret_lifetime, Some(LISTENER_VOLUME_NAME), @@ -1418,11 +1315,7 @@ async fn build_node_rolegroup_statefulset( service_name: Some(rolegroup_ref.rolegroup_headless_service_name()), template: pod_template, update_strategy: Some(StatefulSetUpdateStrategy { - type_: if rolling_update_supported { - Some("RollingUpdate".to_string()) - } else { - Some("OnDelete".to_string()) - }, + type_: Some("RollingUpdate".to_string()), ..StatefulSetUpdateStrategy::default() }), volume_claim_templates: Some(get_volume_claim_templates( diff --git a/rust/operator-binary/src/controller/validate.rs b/rust/operator-binary/src/controller/validate.rs index e0b6d3e4..d1dcda3a 100644 --- a/rust/operator-binary/src/controller/validate.rs +++ b/rust/operator-binary/src/controller/validate.rs @@ -11,15 +11,13 @@ use stackable_operator::{ cli::OperatorEnvironmentOptions, commons::product_image_selection::{self, ResolvedProductImage}, product_config_utils::ValidatedRoleConfigByPropertyKind, - utils::cluster_info::KubernetesClusterInfo, }; use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ config::{self, validated_product_config}, controller::dereference::DereferencedObjects, - crd::{HTTPS_PORT, v1alpha1}, - reporting_task, + crd::v1alpha1, security::{ authentication::{self, NifiAuthenticationConfig}, authorization::ResolvedNifiAuthorizationConfig, @@ -46,11 +44,6 @@ pub enum Error { #[snafu(source(from(config::Error, Box::new)))] source: Box, }, - - #[snafu(display("failed to build reporting task service name"))] - ReportingTask { - source: crate::reporting_task::Error, - }, } type Result = std::result::Result; @@ -71,7 +64,6 @@ pub fn validate( dereferenced_objects: &DereferencedObjects, operator_environment: &OperatorEnvironmentOptions, product_config: &ProductConfigManager, - cluster_info: &KubernetesClusterInfo, ) -> Result { let image = nifi .spec @@ -100,7 +92,7 @@ pub fn validate( ) .context(ProductConfigLoadFailedSnafu)?; - let proxy_hosts = compute_proxy_hosts(nifi, cluster_info)?; + let proxy_hosts = compute_proxy_hosts(nifi)?; Ok(ValidatedInputs { image, @@ -111,10 +103,7 @@ pub fn validate( }) } -fn compute_proxy_hosts( - nifi: &v1alpha1::NifiCluster, - cluster_info: &KubernetesClusterInfo, -) -> Result { +fn compute_proxy_hosts(nifi: &v1alpha1::NifiCluster) -> Result { let host_header_check = &nifi.spec.cluster_config.host_header_check; if host_header_check.allow_all { @@ -135,15 +124,6 @@ fn compute_proxy_hosts( ]); proxy_hosts.extend(host_header_check.additional_allowed_hosts.iter().cloned()); - // Reporting task only exists for NiFi 1.x - if nifi.spec.image.product_version().starts_with("1.") { - let reporting_task_service_name = - reporting_task::build_reporting_task_fqdn_service_name(nifi, cluster_info) - .context(ReportingTaskSnafu)?; - - proxy_hosts.insert(format!("{reporting_task_service_name}:{HTTPS_PORT}")); - } - let mut proxy_hosts = Vec::from_iter(proxy_hosts); proxy_hosts.sort(); diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index b84b5da7..8ce0d8cd 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -27,10 +27,7 @@ use stackable_operator::{ config_overrides::{KeyValueConfigOverrides, KeyValueOverridesProvider}, crd::{authentication::core as auth_core, git_sync}, deep_merger::ObjectOverrides, - k8s_openapi::{ - api::core::v1::{PodTemplateSpec, Volume}, - apimachinery::pkg::api::resource::Quantity, - }, + k8s_openapi::{api::core::v1::Volume, apimachinery::pkg::api::resource::Quantity}, kube::{CustomResource, ResourceExt, runtime::reflector::ObjectRef}, memory::MemoryQuantity, product_config_utils::{self, Configuration}, @@ -39,7 +36,7 @@ use stackable_operator::{ schemars::{self, JsonSchema}, shared::time::Duration, status::condition::{ClusterCondition, HasStatusCondition}, - utils::crds::{raw_object_list_schema, raw_object_schema}, + utils::crds::raw_object_list_schema, versioned::versioned, }; use tls::NifiTls; @@ -54,8 +51,6 @@ pub const PROTOCOL_PORT_NAME: &str = "protocol"; pub const PROTOCOL_PORT: u16 = 9088; pub const BALANCE_PORT_NAME: &str = "balance"; pub const BALANCE_PORT: u16 = 6243; -pub const METRICS_PORT_NAME: &str = "metrics"; -pub const METRICS_PORT: u16 = 8081; pub const STACKABLE_LOG_DIR: &str = "/stackable/log"; pub const STACKABLE_LOG_CONFIG_DIR: &str = "/stackable/log_config"; @@ -167,10 +162,6 @@ pub mod versioned { #[serde(default, skip_serializing_if = "Vec::is_empty")] #[schemars(schema_with = "raw_object_list_schema")] pub extra_volumes: Vec, - - // Docs are on the struct - #[serde(default)] - pub create_reporting_task_job: CreateReportingTaskJob, } // This is flattened in for backwards compatibility reasons, `zookeeper_config_map_name` already existed and used to be mandatory. @@ -185,8 +176,7 @@ pub mod versioned { /// When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/) /// to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. /// - /// The Kubernetes provider will be used if this field is unset. Kubernetes is only supported for NiFi 2.x and newer, - /// NiFi 1.x requires ZooKeeper. + /// The Kubernetes provider will be used if this field is unset. zookeeper_config_map_name: String, }, Kubernetes {}, @@ -316,43 +306,6 @@ pub fn default_allow_all() -> bool { true } -/// This section creates a `create-reporting-task` Kubernetes Job, which enables the export of -/// Prometheus metrics within NiFi. -#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct CreateReportingTaskJob { - /// Whether the Kubernetes Job should be created, defaults to true. It can be helpful to disable - /// the Job, e.g. when you configOverride an authentication mechanism, which the Job currently - /// can't use to authenticate against NiFi. - #[serde(default = "CreateReportingTaskJob::default_enabled")] - pub enabled: bool, - - /// Here you can define a - /// [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) - /// to override any property that can be set on the Pod of the create-reporting-task Kubernetes Job. - /// Read the - /// [Pod overrides documentation](DOCS_BASE_URL_PLACEHOLDER/concepts/overrides#pod-overrides) - /// for more information. - #[serde(default)] - #[schemars(schema_with = "raw_object_schema")] - pub pod_overrides: PodTemplateSpec, -} - -impl Default for CreateReportingTaskJob { - fn default() -> Self { - Self { - enabled: Self::default_enabled(), - pod_overrides: Default::default(), - } - } -} - -impl CreateReportingTaskJob { - const fn default_enabled() -> bool { - true - } -} - #[derive(strum::Display)] #[strum(serialize_all = "camelCase")] pub enum NifiRole { diff --git a/rust/operator-binary/src/crd/sensitive_properties.rs b/rust/operator-binary/src/crd/sensitive_properties.rs index 498260a6..54fbb3b8 100644 --- a/rust/operator-binary/src/crd/sensitive_properties.rs +++ b/rust/operator-binary/src/crd/sensitive_properties.rs @@ -1,15 +1,6 @@ use serde::{Deserialize, Serialize}; -use snafu::Snafu; use stackable_operator::schemars::{self, JsonSchema}; -#[derive(Snafu, Debug)] -pub enum Error { - #[snafu(display( - "The sensitive properties algorithm '{algorithm}' is not supported in NiFi 2.X.X. Please see https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#updating-the-sensitive-properties-algorithm on how to upgrade the algorithm." - ))] - UnsupportedSensitivePropertiesAlgorithm { algorithm: String }, -} - /// These settings configure the encryption of sensitive properties in NiFi processors. /// NiFi supports encrypting sensitive properties in processors as they are written to disk. /// You can configure the encryption algorithm and the key to use. @@ -34,15 +25,6 @@ pub struct NifiSensitivePropertiesConfig { /// `nifiPbkdf2AesGcm256` (the default value), /// `nifiArgon2AesGcm256`, /// - /// The following algorithms are deprecated and will be removed in future versions: - /// - /// `nifiArgon2AesGcm128`, - /// `nifiBcryptAesGcm128`, - /// `nifiBcryptAesGcm256`, - /// `nifiPbkdf2AesGcm128`, - /// `nifiScryptAesGcm128`, - /// `nifiScryptAesGcm256`. - /// /// Learn more about the specifics of the algorithm parameters in the /// [NiFi documentation](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#property-encryption-algorithms). pub algorithm: Option, @@ -53,65 +35,10 @@ pub struct NifiSensitivePropertiesConfig { )] #[serde(rename_all = "camelCase")] pub enum NifiSensitiveKeyAlgorithm { - // supported in v2 #[strum(serialize = "NIFI_PBKDF2_AES_GCM_256")] NifiPbkdf2AesGcm256, - // supported in v2 #[default] #[strum(serialize = "NIFI_ARGON2_AES_GCM_256")] NifiArgon2AesGcm256, - - // Deprecated in v1 -> can be removed when 1.x.x is no longer supported - #[strum(serialize = "NIFI_BCRYPT_AES_GCM_128")] - NifiBcryptAesGcm128, - - // Deprecated in v1 -> can be removed when 1.x.x is no longer supported - #[strum(serialize = "NIFI_BCRYPT_AES_GCM_256")] - NifiBcryptAesGcm256, - - // Deprecated in v1 -> can be removed when 1.x.x is no longer supported - #[strum(serialize = "NIFI_PBKDF2_AES_GCM_128")] - NifiPbkdf2AesGcm128, - - // Deprecated in v1 -> can be removed when 1.x.x is no longer supported - #[strum(serialize = "NIFI_ARGON2_AES_GCM_128")] - NifiArgon2AesGcm128, - - // Deprecated in v1 -> can be removed when 1.x.x is no longer supported - #[strum(serialize = "NIFI_SCRYPT_AES_GCM_128")] - NifiScryptAesGcm128, - - // Deprecated in v1 -> can be removed when 1.x.x is no longer supported - #[strum(serialize = "NIFI_SCRYPT_AES_GCM_256")] - NifiScryptAesGcm256, -} - -impl NifiSensitiveKeyAlgorithm { - /// Checks if the used encryption algorithm is supported or deprecated. - /// Will warn for deprecation and error out for missing support. - pub fn check_for_nifi_version(&self, product_version: &str) -> Result<(), Error> { - let algorithm = self.to_string(); - - match self { - // Allowed and supported in NiFi 1.x.x and 2.x.x - NifiSensitiveKeyAlgorithm::NifiPbkdf2AesGcm256 - | NifiSensitiveKeyAlgorithm::NifiArgon2AesGcm256 => {} - // All others are deprecated in 1.x.x and removed in 2.x.x - // see https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#property-encryption-algorithms - _ => { - if product_version.starts_with("1.") { - tracing::warn!( - "You are using a deprecated sensitive properties algorithm '{algorithm}'. Please update to '{pbkd}' or '{argon}'.", - pbkd = NifiSensitiveKeyAlgorithm::NifiPbkdf2AesGcm256, - argon = NifiSensitiveKeyAlgorithm::NifiArgon2AesGcm256 - ) - } else { - return Err(Error::UnsupportedSensitivePropertiesAlgorithm { algorithm }); - } - } - } - - Ok(()) - } } diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index d20eea0d..1a0b2b4e 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -44,7 +44,6 @@ mod crd; mod listener; mod operations; mod product_logging; -mod reporting_task; mod security; mod service; mod webhooks; diff --git a/rust/operator-binary/src/operations/mod.rs b/rust/operator-binary/src/operations/mod.rs index c62006d9..92ca2ec7 100644 --- a/rust/operator-binary/src/operations/mod.rs +++ b/rust/operator-binary/src/operations/mod.rs @@ -1,3 +1,2 @@ pub mod graceful_shutdown; pub mod pdb; -pub mod upgrade; diff --git a/rust/operator-binary/src/operations/upgrade.rs b/rust/operator-binary/src/operations/upgrade.rs deleted file mode 100644 index 227405ba..00000000 --- a/rust/operator-binary/src/operations/upgrade.rs +++ /dev/null @@ -1,124 +0,0 @@ -// TODO: This module can be removed once we don't support NiFi 1.x versions anymore -// It manages the version upgrade procedure for NiFi versions prior to NiFi 2, since rolling upgrade is not supported there yet - -use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_operator::{ - client::Client, - k8s_openapi::{api::apps::v1::StatefulSet, apimachinery::pkg::apis::meta::v1::LabelSelector}, - kvp::Labels, -}; - -use crate::crd::{APP_NAME, NifiRole, v1alpha1}; - -#[derive(Snafu, Debug)] -pub enum Error { - #[snafu(display("object defines no namespace"))] - ObjectHasNoNamespace, - - #[snafu(display("failed to fetch deployed StatefulSets"))] - FetchStatefulsets { - source: stackable_operator::client::Error, - }, - - #[snafu(display("failed to build labels"))] - LabelBuild { - source: stackable_operator::kvp::LabelError, - }, -} - -type Result = std::result::Result; - -// This struct is used for NiFi versions not supporting rolling upgrades since in that case -// we have to manage the restart process ourselves and need to track the state of it -#[derive(Debug, PartialEq, Eq)] -pub enum ClusterVersionUpdateState { - UpdateRequested, - UpdateInProgress, - ClusterStopped, - NoVersionChange, -} - -pub async fn cluster_version_update_state( - nifi: &v1alpha1::NifiCluster, - client: &Client, - resolved_version: &String, - deployed_version: Option<&String>, -) -> Result { - let namespace = &nifi - .metadata - .namespace - .clone() - .with_context(|| ObjectHasNoNamespaceSnafu {})?; - - // Handle full restarts for a version change - match deployed_version { - Some(deployed_version) => { - if deployed_version != resolved_version { - // Check if statefulsets are already scaled to zero, if not - requeue - let selector = LabelSelector { - match_expressions: None, - match_labels: Some( - Labels::role_selector(nifi, APP_NAME, &NifiRole::Node.to_string()) - .context(LabelBuildSnafu)? - .into(), - ), - }; - - // Retrieve the deployed statefulsets to check on the current status of the restart - let deployed_statefulsets = client - .list_with_label_selector::(namespace, &selector) - .await - .context(FetchStatefulsetsSnafu)?; - - // Sum target replicas for all statefulsets - let target_replicas = deployed_statefulsets - .iter() - .filter_map(|statefulset| statefulset.spec.as_ref()) - .filter_map(|spec| spec.replicas) - .sum::(); - - // Sum current ready replicas for all statefulsets - let current_replicas = deployed_statefulsets - .iter() - .filter_map(|statefulset| statefulset.status.as_ref()) - .map(|status| status.replicas) - .sum::(); - - // If statefulsets have already been scaled to zero, but have remaining replicas - // we requeue to wait until a full stop has been performed. - if target_replicas == 0 && current_replicas > 0 { - tracing::info!( - "Cluster is performing a full restart at the moment and still shutting down, remaining replicas: [{}] - requeuing to wait for shutdown to finish", - current_replicas - ); - return Ok(ClusterVersionUpdateState::UpdateInProgress); - } - - // Otherwise we either still need to scale the statefulsets to 0 or all replicas have - // been stopped and we can restart the cluster. - // Both actions will be taken in the regular reconciliation, so we can simply continue - // here - if target_replicas > 0 { - tracing::info!( - "Version change detected, we'll need to scale down the cluster for a full restart." - ); - Ok(ClusterVersionUpdateState::UpdateRequested) - } else { - tracing::info!("Cluster has been stopped for a restart, will scale back up."); - Ok(ClusterVersionUpdateState::ClusterStopped) - } - } else { - // No version change detected, propagate this to the reconciliation - Ok(ClusterVersionUpdateState::NoVersionChange) - } - } - None => { - // No deployed version set in status, this is probably the first reconciliation ever - // for this cluster, so just let it progress normally - tracing::debug!( - "No deployed version found for this cluster, this is probably the first start, continue reconciliation" - ); - Ok(ClusterVersionUpdateState::NoVersionChange) - } - } -} diff --git a/rust/operator-binary/src/reporting_task/mod.rs b/rust/operator-binary/src/reporting_task/mod.rs deleted file mode 100644 index cdee4c54..00000000 --- a/rust/operator-binary/src/reporting_task/mod.rs +++ /dev/null @@ -1,399 +0,0 @@ -//! The NiFi Reporting Task for Prometheus metrics is created via the NiFi Rest API. -//! -//! This module contains methods to create all required resources (Job, Service) and helper methods -//! to create the Prometheus Reporting Task. -//! -//! Before NiFi 1.25.0 there was only the actual Kubernetes Job required to create and run the Reporting Task Job. -//! -//! Due to changes in the JWT validation in 1.25.0, the issuer refers to the FQDN of the Pod that was created, e.g.: -//! { -//! "sub": "admin", -//! "iss": "test-nifi-node-default-0.test-nifi-node-default-headless.default.svc.cluster.local:8443", -//! } -//! which was different in e.g. 1.23.2 -//! { -//! "sub": "admin", -//! "iss": "SingleUserLoginIdentityProvider", -//! } -//! This caused problems when using the generated JWT against a different node (due to randomness of the service). -//! -//! "An error occurred while attempting to decode the Jwt: Signed JWT rejected: Another algorithm expected, or no matching key(s) found" -//! -//! Therefore, since the support of NiFi 1.25.0, an additional service for the Reporting Task Job containing a -//! random but deterministic NiFi node to ensure the communication with a single node. -//! -use std::collections::BTreeMap; - -use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_operator::{ - builder::{ - self, - meta::ObjectMetaBuilder, - pod::{ - PodBuilder, container::ContainerBuilder, resources::ResourceRequirementsBuilder, - security::PodSecurityContextBuilder, volume::SecretFormat, - }, - }, - commons::product_image_selection::ResolvedProductImage, - k8s_openapi::{ - DeepMerge, - api::{ - batch::v1::{Job, JobSpec}, - core::v1::{Service, ServicePort, ServiceSpec}, - }, - }, - kube::ResourceExt, - kvp::Labels, - shared::time::Duration, - utils::cluster_info::KubernetesClusterInfo, -}; - -use crate::{ - controller::build_recommended_labels, - crd::{APP_NAME, HTTPS_PORT, HTTPS_PORT_NAME, METRICS_PORT, NifiRole, v1alpha1}, - security::{ - authentication::{NifiAuthenticationConfig, STACKABLE_ADMIN_USERNAME}, - build_tls_volume, - }, -}; - -const REPORTING_TASK_CERT_VOLUME_NAME: &str = "tls"; -const REPORTING_TASK_CERT_VOLUME_MOUNT: &str = "/stackable/cert"; -const REPORTING_TASK_CONTAINER_NAME: &str = "reporting-task"; - -#[derive(Snafu, Debug)] -pub enum Error { - #[snafu(display("object defines no name"))] - ObjectHasNoName, - - #[snafu(display("object defines no namespace"))] - ObjectHasNoNamespace, - - #[snafu(display("failed to build metadata"))] - MetadataBuild { - source: stackable_operator::builder::meta::Error, - }, - - #[snafu(display("illegal container name: [{container_name}]"))] - IllegalContainerName { - source: stackable_operator::builder::pod::container::Error, - container_name: String, - }, - - #[snafu(display("object is missing metadata to build owner reference"))] - ObjectMissingMetadataForOwnerRef { - source: stackable_operator::builder::meta::Error, - }, - - #[snafu(display("failed to add Authentication Volumes and VolumeMounts"))] - AddAuthVolumes { - source: crate::security::authentication::Error, - }, - - #[snafu(display("failed to build labels"))] - LabelBuild { - source: stackable_operator::kvp::LabelError, - }, - - #[snafu(display("failed to build secret volume"))] - SecretVolumeBuildFailure { source: crate::security::Error }, - - #[snafu(display("failed to create reporting task service, no role groups defined"))] - FailedBuildReportingTaskService, - - #[snafu(display("failed to add needed volume"))] - AddVolume { source: builder::pod::Error }, - - #[snafu(display("failed to add needed volumeMount"))] - AddVolumeMount { - source: builder::pod::container::Error, - }, -} - -type Result = std::result::Result; - -/// Build required resources to create the reporting task in NiFi versions 1.x. -/// -/// This will return -/// * a Job that creates and runs the reporting task via the NiFi Rest API. -/// * a Service that contains of one single NiFi node. -/// -/// The Service is required in order to communicate only with one designated NiFi node. -/// This is necessary as the generated JWT was changed in 1.25.0 and corrected the issuer -/// from SingleUserLoginIdentityProvider to the FQDN of the pod. -/// The NiFi role service will randomly delegate to different NiFi nodes which will -/// then fail requests to other nodes. -/// -/// NiFi 2.x and above automatically server Prometheus metrics via the API, but as of 2024-11-08 -/// requires authentication. -pub fn build_maybe_reporting_task( - nifi: &v1alpha1::NifiCluster, - resolved_product_image: &ResolvedProductImage, - cluster_info: &KubernetesClusterInfo, - authentication_config: &NifiAuthenticationConfig, - sa_name: &str, -) -> Result> { - if resolved_product_image.product_version.starts_with("1.") { - Ok(Some(( - build_reporting_task_job( - nifi, - resolved_product_image, - cluster_info, - authentication_config, - sa_name, - )?, - build_reporting_task_service(nifi, resolved_product_image)?, - ))) - } else { - Ok(None) - } -} - -/// Return the name of the reporting task. -pub fn build_reporting_task_service_name(nifi_cluster_name: &str) -> String { - format!("{nifi_cluster_name}-{REPORTING_TASK_CONTAINER_NAME}") -} - -/// Return the FQDN (with namespace, domain) of the reporting task. -pub fn build_reporting_task_fqdn_service_name( - nifi: &v1alpha1::NifiCluster, - cluster_info: &KubernetesClusterInfo, -) -> Result { - let nifi_cluster_name = nifi.name_any(); - let nifi_namespace: &str = &nifi.namespace().context(ObjectHasNoNamespaceSnafu)?; - let reporting_task_service_name = build_reporting_task_service_name(&nifi_cluster_name); - let cluster_domain = &cluster_info.cluster_domain; - Ok(format!( - "{reporting_task_service_name}.{nifi_namespace}.svc.{cluster_domain}" - )) -} - -/// Return the name of the first pod belonging to the first role group that contains more than 0 replicas. -/// If no replicas are set in any rolegroup (e.g. HPA, see ) -/// return the first rolegroup just in case. -/// This is required to only select a single node in the Reporting Task Service. -fn get_reporting_task_service_selector_pod(nifi: &v1alpha1::NifiCluster) -> Result { - let cluster_name = nifi.name_any(); - let node_name = NifiRole::Node.to_string(); - - // sort the rolegroups to avoid random sorting and therefore unnecessary reconciles - let sorted_role_groups = nifi - .spec - .nodes - .iter() - .flat_map(|role| &role.role_groups) - .collect::>(); - - let mut selector_role_group = None; - for (role_group_name, role_group) in sorted_role_groups { - // just pick the first rolegroup in case no replicas are set - if selector_role_group.is_none() { - selector_role_group = Some(role_group_name); - } - - if let Some(replicas) = role_group.replicas { - if replicas > 0 { - selector_role_group = Some(role_group_name); - break; - } - } - } - - Ok(format!( - "{cluster_name}-{node_name}-{role_group_name}-0", - role_group_name = selector_role_group.context(FailedBuildReportingTaskServiceSnafu)? - )) -} - -/// Build the internal Reporting Task Service in order to communicate with a single NiFi node. -fn build_reporting_task_service( - nifi: &v1alpha1::NifiCluster, - resolved_product_image: &ResolvedProductImage, -) -> Result { - let nifi_cluster_name = nifi.name_any(); - let role_name = NifiRole::Node.to_string(); - let mut selector: BTreeMap = Labels::role_selector(nifi, APP_NAME, &role_name) - .context(LabelBuildSnafu)? - .into(); - - let service_selector_pod = get_reporting_task_service_selector_pod(nifi)?; - selector.insert( - "statefulset.kubernetes.io/pod-name".to_string(), - service_selector_pod, - ); - - Ok(Service { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(nifi) - .name(build_reporting_task_service_name(&nifi_cluster_name)) - .ownerreference_from_resource(nifi, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(&build_recommended_labels( - nifi, - &resolved_product_image.app_version_label_value, - &role_name, - "global", - )) - .context(MetadataBuildSnafu)? - .build(), - spec: Some(ServiceSpec { - ports: Some(vec![ServicePort { - name: Some(HTTPS_PORT_NAME.to_string()), - port: HTTPS_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }]), - selector: Some(selector), - ..ServiceSpec::default() - }), - status: None, - }) -} - -/// Build the [`Job`](`stackable_operator::k8s_openapi::api::batch::v1::Job`) that creates a -/// NiFi `ReportingTask` in order to enable JVM and NiFi metrics. -/// -/// The Job is run via the [`tools`](https://github.com/stackabletech/docker-images/tree/main/tools) -/// docker image and more specifically the `create_nifi_reporting_task.py` Python script. -/// -/// This script uses the [`nipyapi`](https://nipyapi.readthedocs.io/en/latest/readme.html) -/// library to authenticate and run the required REST calls to the NiFi REST API. -/// -/// In order to authenticate we need the `username` and `password` from the -/// [`NifiAuthenticationConfig`](`crate::security::authentication::NifiAuthenticationConfig`) -/// as well as a public certificate provided by the Stackable -/// [`secret-operator`](https://github.com/stackabletech/secret-operator) -/// -fn build_reporting_task_job( - nifi: &v1alpha1::NifiCluster, - resolved_product_image: &ResolvedProductImage, - cluster_info: &KubernetesClusterInfo, - nifi_auth_config: &NifiAuthenticationConfig, - sa_name: &str, -) -> Result { - let reporting_task_fqdn_service_name = - build_reporting_task_fqdn_service_name(nifi, cluster_info)?; - let product_version = &resolved_product_image.product_version; - let nifi_connect_url = - format!("https://{reporting_task_fqdn_service_name}:{HTTPS_PORT}/nifi-api",); - - let (admin_username_file, admin_password_file) = - nifi_auth_config.get_user_and_password_file_paths(); - - let user_name_command = if admin_username_file.is_empty() { - // In case of the username being simple (e.g admin for SingleUser) just use it as is - format!("-u {STACKABLE_ADMIN_USERNAME}") - } else { - // If the username is a bind dn (e.g. cn=integrationtest,ou=my users,dc=example,dc=org) we have to extract the cn/dn/uid (in this case integrationtest) - format!( - "-u \"$(cat {admin_username_file} | grep -oP '((cn|dn|uid)=\\K[^,]+|.*)' | head -n 1)\"" - ) - }; - - let args = [ - "/stackable/python/create_nifi_reporting_task.py".to_string(), - format!("-n {nifi_connect_url}"), - user_name_command, - format!("-p \"$(cat {admin_password_file})\""), - format!("-m {METRICS_PORT}"), - format!("-c {REPORTING_TASK_CERT_VOLUME_MOUNT}/ca.crt"), - ]; - let mut cb = ContainerBuilder::new(REPORTING_TASK_CONTAINER_NAME).with_context(|_| { - IllegalContainerNameSnafu { - container_name: REPORTING_TASK_CONTAINER_NAME.to_string(), - } - })?; - cb.image_from_product_image(resolved_product_image) - .command(vec!["sh".to_string(), "-c".to_string()]) - .args(vec![args.join(" ")]) - // The VolumeMount for the secret operator key store certificates - .add_volume_mount( - REPORTING_TASK_CERT_VOLUME_NAME, - REPORTING_TASK_CERT_VOLUME_MOUNT, - ) - .context(AddVolumeMountSnafu)? - .resources( - ResourceRequirementsBuilder::new() - .with_cpu_request("100m") - .with_cpu_limit("400m") - .with_memory_request("512Mi") - .with_memory_limit("512Mi") - .build(), - ); - - let job_name = format!( - "{}-create-reporting-task-{}", - nifi.name_any(), - product_version.replace('.', "-").to_ascii_lowercase() - ); - - let mut pb = PodBuilder::new(); - nifi_auth_config - .add_volumes_and_mounts(&mut pb, vec![&mut cb]) - .context(AddAuthVolumesSnafu)?; - - let mut pod_template = pb - .metadata( - ObjectMetaBuilder::new() - .name_and_namespace(nifi) - .name(job_name.clone()) - .ownerreference_from_resource(nifi, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .build(), - ) - .image_pull_secrets_from_product_image(resolved_product_image) - .restart_policy("OnFailure") - .service_account_name(sa_name) - .security_context(PodSecurityContextBuilder::new().fs_group(1000).build()) - .add_container(cb.build()) - .add_volume( - build_tls_volume( - nifi, - REPORTING_TASK_CERT_VOLUME_NAME, - Vec::::new(), - SecretFormat::TlsPem, - // The certificate is only used for the REST API call, so a short lifetime is sufficient. - // There is no correct way to configure this job since it's an implementation detail. - // Also it will be dropped when support for 1.x is removed. - &Duration::from_days_unchecked(1), - // There is no listener volume we could get certs for - None, - ) - .context(SecretVolumeBuildFailureSnafu)?, - ) - .context(AddVolumeSnafu)? - .build_template(); - - pod_template.merge_from( - nifi.spec - .cluster_config - .create_reporting_task_job - .pod_overrides - .clone(), - ); - - let job = Job { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(nifi) - .name(job_name) - .ownerreference_from_resource(nifi, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(&build_recommended_labels( - nifi, - &resolved_product_image.app_version_label_value, - "global", - "global", - )) - .context(MetadataBuildSnafu)? - .build(), - spec: Some(JobSpec { - backoff_limit: Some(100), - ttl_seconds_after_finished: Some(120), - template: pod_template, - ..JobSpec::default() - }), - ..Job::default() - }; - - Ok(job) -} diff --git a/rust/operator-binary/src/service.rs b/rust/operator-binary/src/service.rs index 9bd4a722..d4699d13 100644 --- a/rust/operator-binary/src/service.rs +++ b/rust/operator-binary/src/service.rs @@ -8,7 +8,7 @@ use stackable_operator::{ role_utils::RoleGroupRef, }; -use crate::crd::{HTTPS_PORT, HTTPS_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, v1alpha1}; +use crate::crd::{HTTPS_PORT, HTTPS_PORT_NAME, v1alpha1}; #[derive(Snafu, Debug)] pub enum Error { @@ -44,7 +44,7 @@ pub fn build_rolegroup_headless_service( // Internal communication does not need to be exposed type_: Some("ClusterIP".to_string()), cluster_ip: Some("None".to_string()), - ports: Some(headless_service_ports()), + ports: Some(vec![headless_service_port()]), selector: Some(selector), publish_not_ready_addresses: Some(true), ..ServiceSpec::default() @@ -59,7 +59,6 @@ pub fn build_rolegroup_metrics_service( role_group_ref: &RoleGroupRef, object_labels: ObjectLabels, selector: BTreeMap, - product_version: &str, ) -> Result { Ok(Service { metadata: ObjectMetaBuilder::new() @@ -70,13 +69,13 @@ pub fn build_rolegroup_metrics_service( .with_recommended_labels(&object_labels) .context(MetadataBuildSnafu)? .with_labels(prometheus_labels()) - .with_annotations(prometheus_annotations(product_version)) + .with_annotations(prometheus_annotations()) .build(), spec: Some(ServiceSpec { // Internal communication does not need to be exposed type_: Some("ClusterIP".to_string()), cluster_ip: Some("None".to_string()), - ports: Some(vec![metrics_service_port(product_version)]), + ports: Some(vec![metrics_service_port()]), selector: Some(selector), publish_not_ready_addresses: Some(true), ..ServiceSpec::default() @@ -85,33 +84,22 @@ pub fn build_rolegroup_metrics_service( }) } -fn headless_service_ports() -> Vec { - vec![ServicePort { +fn headless_service_port() -> ServicePort { + ServicePort { name: Some(HTTPS_PORT_NAME.into()), port: HTTPS_PORT.into(), protocol: Some("TCP".to_string()), ..ServicePort::default() - }] + } } -/// Returns the metrics port based on the NiFi version -/// V1: Uses extra port via JMX exporter -/// V2: Uses NiFi HTTP(S) port for metrics -pub fn metrics_service_port(product_version: &str) -> ServicePort { - if product_version.starts_with("1.") { - ServicePort { - name: Some(METRICS_PORT_NAME.to_string()), - port: METRICS_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - } - } else { - ServicePort { - name: Some(HTTPS_PORT_NAME.into()), - port: HTTPS_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - } +/// Returns the metrics port, which is the NiFi HTTP(S) port. +pub fn metrics_service_port() -> ServicePort { + ServicePort { + name: Some(HTTPS_PORT_NAME.into()), + port: HTTPS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() } } @@ -125,17 +113,14 @@ fn prometheus_labels() -> Labels { /// These annotations can be used in a ServiceMonitor. /// /// see also -fn prometheus_annotations(product_version: &str) -> Annotations { - let (path, port, scheme) = if product_version.starts_with("1.") { - ("/metrics", METRICS_PORT, "http") - } else { - ("/nifi-api/flow/metrics/prometheus", HTTPS_PORT, "https") - }; - +fn prometheus_annotations() -> Annotations { Annotations::try_from([ - ("prometheus.io/path".to_owned(), path.to_owned()), - ("prometheus.io/port".to_owned(), port.to_string()), - ("prometheus.io/scheme".to_owned(), scheme.to_owned()), + ( + "prometheus.io/path".to_owned(), + "/nifi-api/flow/metrics/prometheus".to_owned(), + ), + ("prometheus.io/port".to_owned(), HTTPS_PORT.to_string()), + ("prometheus.io/scheme".to_owned(), "https".to_owned()), ("prometheus.io/scrape".to_owned(), "true".to_owned()), ]) .expect("should be valid annotations")