使用下面的代码,将从游标 0 开始扫描 1000 个对象
SCAN 0 MATCH "foo:bar:*" COUNT 1000
结果,将获得一个要调用的新光标:
SCAN YOUR_NEW_CURSOR MATCH "foo:bar:*" COUNT 1000
要扫描整个列表,你需要调用 SCAN 直到光标返回零(即整个扫描)
使用 INFO 命令获取密钥数量,例如:
db0:keys=YOUR_AMOUNT_OF_KEYS,expires=0,avg_ttl=0
然后调用:
SCAN 0 MATCH "foo:bar:*" COUNT YOUR_AMOUNT_OF_KEYS
实际上我和这位的错误一样:https://www.orchome.com/6808 。加了Client之后报了同样的错误。我是根据这里来配置的:https://www.orchome.com/1966 。不知道这里是不是少了zookeeper相关的配置。
错误很明显吧:
No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/profiles.d/ssl/kafka/kafka_server_jaas.conf'.
无法删除是因为命名空间中仍然存在的资源引起的。
以下命令显示命名空间中剩余的资源:
kubectl api-resources --verbs=list --namespaced -o name \
| xargs -n 1 kubectl get --show-kind --ignore-not-found -n <namespace>
一旦你移除了这些资源之后,命名空间就能删掉了。
无法删除是因为命名空间中仍然存在的资源引起的。
以下命令显示命名空间中剩余的资源:
kubectl api-resources --verbs=list --namespaced -o name \
| xargs -n 1 kubectl get --show-kind --ignore-not-found -n <namespace>
一旦你移除了这些资源之后,命名空间就能删掉了。
嗯,注意要最后结束的时候掉下close(官方例子有,不要漏了)。
如果不想调close,就加一些休眠时间。(因为有些消息是还在缓存中,没来得及发送到kafka的时候,进程就结束了)
感谢大佬的指点,目前已经全部调通,包括kerberos环境!
非kerberos环境最后配置的格式就是上面贴的。
kerberos环境 大致还需要以下几点。
1、kafka-server端加了环境变量
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/conf/kafka_jaas.conf"
2、/etc/krb5.conf文件可能需要加一行udp_preference_limit = 1 将udp改成tcp防止丢包(这个不一定需要)
3、客户端需要一个kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="zookeeper";
};
4、然后一些sasl的配置,监听器的配置就不赘述了
总结:之前对“主动发现集群机制”了解不够,也不知道消费时要对每一个broker都开一个长连接* 加上报错一直都是权限验证失败让人感觉是kerberos的问题,绕了很久。后面排除无关的因素,就很明显了。另外提醒ambari安装的kafka不管界面上配置的advertised.listeners是多少,内部代码还是会强行将listeners的值赋给advertised.listeners。
还是很感谢大佬的及时回复 耐心指导。期待以后更多的交流
公网ip只有一个,是不行的。因为kafka客户端要与每个node建立长连接,进行数据交换的。
你可以参考:kafka外网访问 ,里面有外网转发的原因。
另外注意,端口不能变,要映射一模一样的。
最后,你做测试的时候,可以先不用kerberos,一步一步来,如果内网、外网消费和生产都可以了,在加上认证。
对应的是pod的.metadata.uid
:
for d in /var/lib/kubelet/pods/*; do
p_u=$(basename "$d")
kubectl get po -A -o json | \
jq --arg pod_uuid "$p_u" -r '.items[]
| select(.metadata.uid == $pod_uuid)
| "uuid \($pod_uuid) is \(.metadata.name)"'
done
类似如下输出:
"Labels": {
"annotation.io.kubernetes.container.hash": "e44bee94",
"annotation.io.kubernetes.container.restartCount": "4",
"annotation.io.kubernetes.container.terminationMessagePath": "/dev/termination-log",
"annotation.io.kubernetes.container.terminationMessagePolicy": "File",
"annotation.io.kubernetes.pod.terminationGracePeriod": "30",
"io.kubernetes.container.logpath": "/var/log/pods/kube-system_storage-provisioner_b4aa3b1c-62c1-4661-a302-4c06b305b7c0/storage-provisioner/4.log",
"io.kubernetes.container.name": "storage-provisioner",
"io.kubernetes.docker.type": "container",
"io.kubernetes.pod.name": "storage-provisioner",
"io.kubernetes.pod.namespace": "kube-system",
"io.kubernetes.pod.uid": "b4aa3b1c-62c1-4661-a302-4c06b305b7c0",
"io.kubernetes.sandbox.id": "3950ec60121fd13116230cad388a4c6c4e417c660b7da475436f9ad5c9cf6738"
}
对应的是pod的.metadata.uid
:
for d in /var/lib/kubelet/pods/*; do
p_u=$(basename "$d")
kubectl get po -A -o json | \
jq --arg pod_uuid "$p_u" -r '.items[]
| select(.metadata.uid == $pod_uuid)
| "uuid \($pod_uuid) is \(.metadata.name)"'
done
类似如下输出:
"Labels": {
"annotation.io.kubernetes.container.hash": "e44bee94",
"annotation.io.kubernetes.container.restartCount": "4",
"annotation.io.kubernetes.container.terminationMessagePath": "/dev/termination-log",
"annotation.io.kubernetes.container.terminationMessagePolicy": "File",
"annotation.io.kubernetes.pod.terminationGracePeriod": "30",
"io.kubernetes.container.logpath": "/var/log/pods/kube-system_storage-provisioner_b4aa3b1c-62c1-4661-a302-4c06b305b7c0/storage-provisioner/4.log",
"io.kubernetes.container.name": "storage-provisioner",
"io.kubernetes.docker.type": "container",
"io.kubernetes.pod.name": "storage-provisioner",
"io.kubernetes.pod.namespace": "kube-system",
"io.kubernetes.pod.uid": "b4aa3b1c-62c1-4661-a302-4c06b305b7c0",
"io.kubernetes.sandbox.id": "3950ec60121fd13116230cad388a4c6c4e417c660b7da475436f9ad5c9cf6738"
}
对应的是pod的.metadata.uid
:
for d in /var/lib/kubelet/pods/*; do
p_u=$(basename "$d")
kubectl get po -A -o json | \
jq --arg pod_uuid "$p_u" -r '.items[]
| select(.metadata.uid == $pod_uuid)
| "uuid \($pod_uuid) is \(.metadata.name)"'
done
类似如下输出:
"Labels": {
"annotation.io.kubernetes.container.hash": "e44bee94",
"annotation.io.kubernetes.container.restartCount": "4",
"annotation.io.kubernetes.container.terminationMessagePath": "/dev/termination-log",
"annotation.io.kubernetes.container.terminationMessagePolicy": "File",
"annotation.io.kubernetes.pod.terminationGracePeriod": "30",
"io.kubernetes.container.logpath": "/var/log/pods/kube-system_storage-provisioner_b4aa3b1c-62c1-4661-a302-4c06b305b7c0/storage-provisioner/4.log",
"io.kubernetes.container.name": "storage-provisioner",
"io.kubernetes.docker.type": "container",
"io.kubernetes.pod.name": "storage-provisioner",
"io.kubernetes.pod.namespace": "kube-system",
"io.kubernetes.pod.uid": "b4aa3b1c-62c1-4661-a302-4c06b305b7c0",
"io.kubernetes.sandbox.id": "3950ec60121fd13116230cad388a4c6c4e417c660b7da475436f9ad5c9cf6738"
}