Dynamic Source for Image Puller (original) (raw)
Sure! In my case, I have a PostgreSQL database with Docker images.
Every 15 minutes, I fetch the images and check if they have been pulled since the pod started.
If not, they are added to the list of init containers.
However, the script can be improved with proper error handling. For instance, if an image cannot be pulled (e.g., credentials are missing), the script breaks and does not skip the image.
package main
import (
"context"
"database/sql"
"fmt"
"os"
"time"
"slices"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
log "github.com/sirupsen/logrus"
_ "github.com/lib/pq"
)
func doesDaemonSetExists(clientset *kubernetes.Clientset) (bool, error) {
result, err := clientset.AppsV1().DaemonSets(Namespace).List(context.Background(), metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", "profile-puller-node").String(),
})
if err != nil {
return false, err
}
return len(result.Items) == 1, nil
}
func areAllPodsRunning(clientset *kubernetes.Clientset) (bool, error) {
pods, err := clientset.CoreV1().Pods(Namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: "app=profile-puller-node",
})
if err != nil {
return false, err
}
if len(pods.Items) == 0 {
return false, nil
}
for _, pod := range pods.Items {
if pod.Status.Phase != v1.PodRunning {
return false, nil
}
}
return true, nil
}
func openDatabaseConnection() (*sql.DB, error) {
connStr := fmt.Sprintf("user=%s dbname=%s password=%s host=%s sslmode=disable",
os.Getenv("JH_DB_USER"),
os.Getenv("JH_DB_NAME"),
os.Getenv("JH_DB_PASSWORD"),
os.Getenv("JH_DB_HOST"))
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, err
}
err = db.Ping()
if err != nil {
return nil, err
}
return db, err
}
func queryProfiles(db *sql.DB) ([]string, error) {
query := `SELECT DISTINCT docker_image FROM profiles`
rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
dockerImages := []string{}
for rows.Next() {
var dockerImage string
err := rows.Scan(&dockerImage)
if err != nil {
return nil, err
}
dockerImages = append(dockerImages, dockerImage)
}
if err := rows.Err(); err != nil {
return nil, err
}
return dockerImages, nil
}
const Namespace = "<Kubernetes namespace>"
func main() {
log.Infoln("Starting profile puller")
// Connect to cluster
config, err := rest.InClusterConfig()
if err != nil {
log.Fatalf("Could not get in-cluster config: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Could not connect to Kubernetes API: %v", err)
}
// "Cache" already pulled images
imageCache := []string{}
// Query images
db, err := openDatabaseConnection()
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
for {
time.Sleep(15 * time.Minute)
// Check if image puller is still running
running, err := doesDaemonSetExists(clientset)
if err != nil {
log.Fatalf("Failed to check list of DaemonSets: %v", err)
continue
}
if running {
log.Warnln("Image puller is still pulling images from previous job")
continue
}
queriedDockerImages, err := queryProfiles(db)
if err != nil {
log.Errorf("Failed to query docker images: %v", err)
continue
}
// Filter images
dockerImages := []string{}
for i := range queriedDockerImages {
if !slices.Contains(imageCache, queriedDockerImages[i]) {
dockerImages = append(dockerImages, queriedDockerImages[i])
imageCache = append(imageCache, queriedDockerImages[i])
}
}
// Remove duplicates
slices.Sort(dockerImages)
dockerImages = slices.Compact(dockerImages)
if len(dockerImages) == 0 {
log.Infoln("Profiles contain only already pulled Docker images.")
continue
} else {
log.Infof("Found a total of %d uncached Docker images to pull:", len(dockerImages))
for i := range dockerImages {
log.Infof(" - %s", dockerImages[i])
}
}
// Build DaemonSet
initContainers := []v1.Container{}
for i := range dockerImages {
dockerImageName := dockerImages[i]
initContainer := v1.Container{
Name: fmt.Sprintf("profile-puller-%d", i),
Image: dockerImageName,
ImagePullPolicy: v1.PullIfNotPresent,
Command: []string{"echo", "Hello World"},
}
initContainers = append(initContainers, initContainer)
}
daemonSet := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "profile-puller-node"},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
"name": "profile-puller-node",
"app": "profile-puller-node",
}},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
"name": "profile-puller-node",
"app": "profile-puller-node",
}},
Spec: v1.PodSpec{
InitContainers: initContainers,
Containers: []v1.Container{
{Name: "pause", Image: "gcr.io/google_containers/pause:3.2"},
},
},
},
},
}
_, err = clientset.AppsV1().DaemonSets(Namespace).Create(context.Background(), &daemonSet, metav1.CreateOptions{})
if err != nil {
log.Errorf("Could not create DaemonSet: %v", err)
continue
}
// Poll running pods to delete DaemonSet
interval := 30 * time.Second
for {
allRunning, err := areAllPodsRunning(clientset)
if err != nil {
log.Errorf("Could not get pod status: %v", err)
continue
}
if allRunning {
log.Infof("All pods for DaemonSet %s are running. Proceeding to delete.", "profile-puller-node")
err := clientset.AppsV1().DaemonSets(Namespace).Delete(context.Background(), "profile-puller-node", metav1.DeleteOptions{})
if err != nil {
log.Errorf("Failed to delete DaemonSet: %v", err)
continue
}
log.Infof("DaemonSet %s deleted successfully.", "profile-puller-node")
break
} else {
log.Infof("Not all pods are running for DaemonSet %s. Checking again in %v...", "profile-puller-node", interval)
time.Sleep(interval)
}
}
}
}
Note that the deployment of the script as a pod requires a dedicated service account with the following role:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: profile-puller
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list"]
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["get", "watch", "list", "create", "delete"]
Please let me know if you have further questions!