mirror of https://github.com/apache/flink.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
206 lines
6.6 KiB
Bash
206 lines
6.6 KiB
Bash
#!/usr/bin/env bash
|
|
################################################################################
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
################################################################################
|
|
|
|
OUT_TYPE="${1:-local}" # other type: s3
|
|
SINK_TO_TEST="${2:-"StreamingFileSink"}"
|
|
|
|
source "$(dirname "$0")"/common.sh
|
|
|
|
# LOCAL_JOB_OUTPUT_PATH is a local folder that can be used as a download folder for remote data
|
|
# the helper functions will access this folder
|
|
RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
|
|
LOCAL_JOB_OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
|
|
mkdir -p "${LOCAL_JOB_OUTPUT_PATH}"
|
|
|
|
# JOB_OUTPUT_PATH is the location where the job writes its data to
|
|
JOB_OUTPUT_PATH="${LOCAL_JOB_OUTPUT_PATH}"
|
|
|
|
###################################
|
|
# Get all lines in part files and sort them numerically.
|
|
#
|
|
# Globals:
|
|
# LOCAL_JOB_OUTPUT_PATH
|
|
# Arguments:
|
|
# None
|
|
# Returns:
|
|
# sorted content of part files
|
|
###################################
|
|
function get_complete_result {
|
|
find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
|
|
}
|
|
|
|
###################################
|
|
# Get total number of lines in part files.
|
|
#
|
|
# Globals:
|
|
# LOCAL_JOB_OUTPUT_PATH
|
|
# Arguments:
|
|
# None
|
|
# Returns:
|
|
# line number in part files
|
|
###################################
|
|
function get_total_number_of_valid_lines {
|
|
get_complete_result | wc -l | tr -d '[:space:]'
|
|
}
|
|
|
|
if [ "${OUT_TYPE}" == "local" ]; then
|
|
echo "[INFO] Test run in local environment: No S3 environment is loaded."
|
|
elif [ "${OUT_TYPE}" == "s3" ]; then
|
|
source "$(dirname "$0")"/common_s3_minio.sh
|
|
s3_setup hadoop
|
|
|
|
# overwrites JOB_OUTPUT_PATH to point to S3
|
|
S3_DATA_PREFIX="${RANDOM_PREFIX}"
|
|
S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk"
|
|
JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}"
|
|
set_config_key "execution.checkpointing.dir" "s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}"
|
|
|
|
# overwrites implementation for local runs
|
|
function get_complete_result {
|
|
# copies the data from S3 to the local LOCAL_JOB_OUTPUT_PATH
|
|
s3_get_by_full_path_and_filename_prefix "$LOCAL_JOB_OUTPUT_PATH" "$S3_DATA_PREFIX" true "part-"
|
|
|
|
# and prints the sorted output
|
|
find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
|
|
}
|
|
|
|
# overwrites implementation for local runs
|
|
function get_total_number_of_valid_lines {
|
|
s3_get_number_of_lines_by_prefix "${S3_DATA_PREFIX}" "part-"
|
|
}
|
|
|
|
# make sure we delete the file at the end
|
|
function out_cleanup {
|
|
s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
|
|
s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
|
|
}
|
|
|
|
on_exit out_cleanup
|
|
else
|
|
echo "[ERROR] Unknown out type: ${OUT_TYPE}"
|
|
exit 1
|
|
fi
|
|
|
|
# randomly set up openSSL with dynamically/statically linked libraries
|
|
OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi)
|
|
echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')"
|
|
|
|
set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
|
|
# set_conf_ssl moves netty libraries into FLINK_DIR which we want to rollback at the end of the test run
|
|
on_exit rollback_openssl_lib
|
|
|
|
set_config_key "metrics.fetcher.update-interval" "2000"
|
|
# this test relies on global failovers
|
|
set_config_key "jobmanager.execution.failover-strategy" "full"
|
|
|
|
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
|
|
|
|
###################################
|
|
# Waits until a number of values have been written within a timeout.
|
|
# If the timeout expires, exit with return code 1.
|
|
#
|
|
# Globals:
|
|
# None
|
|
# Arguments:
|
|
# $1: the number of expected values
|
|
# $2: timeout in seconds
|
|
# Returns:
|
|
# None
|
|
###################################
|
|
function wait_for_complete_result {
|
|
local expected_number_of_values=$1
|
|
local polling_timeout=$2
|
|
local polling_interval=1
|
|
local seconds_elapsed=0
|
|
|
|
local number_of_values=0
|
|
local previous_number_of_values=-1
|
|
|
|
while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
|
|
if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
|
|
echo "Did not produce expected number of values within ${polling_timeout}s"
|
|
exit 1
|
|
fi
|
|
|
|
sleep ${polling_interval}
|
|
((seconds_elapsed += ${polling_interval}))
|
|
|
|
number_of_values=$(get_total_number_of_valid_lines)
|
|
if [[ ${previous_number_of_values} -ne ${number_of_values} ]]; then
|
|
echo "Number of produced values ${number_of_values}/${expected_number_of_values}"
|
|
previous_number_of_values=${number_of_values}
|
|
fi
|
|
done
|
|
}
|
|
|
|
function run_file_sink_test {
|
|
start_cluster
|
|
|
|
"${FLINK_DIR}/bin/taskmanager.sh" start
|
|
"${FLINK_DIR}/bin/taskmanager.sh" start
|
|
"${FLINK_DIR}/bin/taskmanager.sh" start
|
|
|
|
echo "Submitting job."
|
|
CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${JOB_OUTPUT_PATH}" \
|
|
--sinkToTest "${SINK_TO_TEST}")
|
|
JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g')
|
|
|
|
if [[ -z $JOB_ID ]]; then
|
|
echo "Job could not be submitted."
|
|
echo "${CLIENT_OUTPUT}"
|
|
exit 1
|
|
fi
|
|
|
|
wait_job_running ${JOB_ID}
|
|
|
|
wait_num_checkpoints "${JOB_ID}" 3
|
|
|
|
echo "Killing TM"
|
|
kill_random_taskmanager
|
|
|
|
echo "Starting TM"
|
|
"$FLINK_DIR/bin/taskmanager.sh" start
|
|
|
|
wait_for_restart_to_complete 0 ${JOB_ID}
|
|
|
|
echo "Killing 2 TMs"
|
|
kill_random_taskmanager
|
|
kill_random_taskmanager
|
|
|
|
echo "Starting 2 TMs"
|
|
"$FLINK_DIR/bin/taskmanager.sh" start
|
|
"$FLINK_DIR/bin/taskmanager.sh" start
|
|
|
|
wait_for_restart_to_complete 1 ${JOB_ID}
|
|
|
|
echo "Waiting until all values have been produced"
|
|
wait_for_complete_result 60000 900
|
|
|
|
cancel_job "${JOB_ID}"
|
|
|
|
wait_job_terminal_state "${JOB_ID}" "CANCELED"
|
|
|
|
get_complete_result > "${TEST_DATA_DIR}/complete_result"
|
|
|
|
check_result_hash "File Streaming Sink" "$TEST_DATA_DIR/complete_result" "6727342fdd3aae2129e61fc8f433fb6f"
|
|
}
|
|
|
|
# usual runtime is ~6 minutes
|
|
run_test_with_timeout 900 run_file_sink_test
|