是可以删除group.id,可参考:
另外一种方式是重置offset
,分2步:
时间
来获取到offset
offset
c++的我没用过,java的关键的2个命令如下,可以帮助你找找对应c++的代码:
consumer.offsetsForTimes
// 通过时间,定位offsetconsumer.seek
// 重新定位offset大佬的回答很生动形象,瞬间理解了很多,我试了一下,现场和您说的是一样的。
但现在需要的是,完全不需要老的报纸,每次用新的group.id感觉不太好,是否有接口能在报社名单上消除这个id,我下次再用,它就认为是新的id。将不会给我老的报纸,或者配置中有没有设置,告诉报社,即使名单上有了这个group.id
,也不用给他旧的 报纸,它来了直接给最新的。
是的,该错误表明,有很多信息被缓冲,在超时之前无法刷新。为了解决这个问题,你可以
offset.flush.timeout.ms
配置参数。producer.buffer.memory
来减少被缓冲的数据量。当你有相当大的消息时,这将是最好的选择。
你想像一下订报纸,在你没有订阅之前,报社根本就不知道你,当你订阅的这一刻,新的报纸才会发给你,老的报纸是不会给你的。这时,你消费者停掉了,发报员依然会为你保留报纸,因为你已经在报社有名单了。这个名单是根据是group.id
确认的,如果你想每次都是最新的,可以每次都用新的group.id
。
另外要设置成auto.offset.reset=latest
,解释如下:
但是,这个latest
并不代表这一时刻的消息,如果你已经在报社有名单了,那就是基于报社记录你最后一次拿取报纸时间,把这些‘报纸’都发给你。
您好,我的消费日志中出现大量commit of offsets timed out.而且消费数据很慢造成了数据积压。这个超时时间是由offset.flush.timeout.ms控制的吗?如果是,我增大它的值有什么约束条件吗?
使用String.format()
public static String afterTextChanged(String view) {
String s = null;
try {
s = String.format("%,d", Long.parseLong(view));
} catch (NumberFormatException e) {
return view;
}
return s;
}
结果
123
1,234
12,345
12,345,678
改了之后:虽然钉钉还会在网络波动的时候报警,因为数据从生产到目标topic大概会用1分钟时间。但是再也没有重复消费情况了。感谢!
啊,不知道理解的对不对?:
第一个代码是:病人是对象,注射类型、是否注射、病人都是其属性。显然这些东西作为病人的属性是不“面向对象的”。就比如我们不能说猫是狗的对象,只能说狗有个属性是玩,把对象猫给它,它就能玩了。
第二个代码是:病人是对象,他有个方法是注射疫苗,直接调方法就能实现功能。但是细节未知。
第三个代码是:病人、护士、疫苗都是不同对象,先选出疫苗类型,护士可以做的事是打标准疫苗,只要将疫苗和病人给护士,护士就能完成这个事情。
消费者是批量拉取消息,也就是说在你停止项目的时候,如果此时刚拉取了消息,提交offsr,但程序只处理一部分,那剩下的消息就会漏掉。
apiVersion: v1
kind: ServiceAccount
metadata:
name: sa-all
namespace: test-namespace
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
annotations:
rbac.authorization.kubernetes.io/autoupdate: "true"
labels:
kubernetes.io/bootstrapping: rbac-defaults
name: cluster-role-all
rules:
- apiGroups:
- '*'
resources:
- '*'
verbs:
- '*'
- nonResourceURLs:
- '*'
verbs:
- '*'
*
代表所有。
verbs包括 ["get", "list", "watch", "create", "update", "patch", "delete"]
权限。
你也可以设置部份权限和资源,如下
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: monitoring-endpoints
labels:
rbac.example.com/aggregate-to-monitoring: "true"
# 当你创建 "monitoring-endpoints" ClusterRole 时,
# 下面的规则会被添加到 "monitoring" ClusterRole 中
rules:
- apiGroups: [""]
resources: ["services", "endpoints", "pods"]
verbs: ["get", "list", "watch"]
通过kubectl api-resources
命令可以查看kubernetes当前版本apiGroups和resources。
NAME SHORTNAMES APIGROUP NAMESPACED KIND
bindings true Binding
componentstatuses cs false ComponentStatus
configmaps cm true ConfigMap
endpoints ep true Endpoints
events ev true Event
limitranges limits true LimitRange
namespaces ns false Namespace
nodes no false Node
persistentvolumeclaims pvc true PersistentVolumeClaim
persistentvolumes pv false PersistentVolume
pods po true Pod
podtemplates true PodTemplate
replicationcontrollers rc true ReplicationController
resourcequotas quota true ResourceQuota
secrets true Secret
serviceaccounts sa true ServiceAccount
services svc true Service
mutatingwebhookconfigurations admissionregistration.k8s.io false MutatingWebhookConfiguration
validatingwebhookconfigurations admissionregistration.k8s.io false ValidatingWebhookConfiguration
customresourcedefinitions crd,crds apiextensions.k8s.io false CustomResourceDefinition
apiservices apiregistration.k8s.io false APIService
applications app.k8s.io true Application
controllerrevisions apps true ControllerRevision
daemonsets ds apps true DaemonSet
deployments deploy apps true Deployment
replicasets rs apps true ReplicaSet
statefulsets sts apps true StatefulSet
workflows wf argoproj.io true Workflow
tokenreviews authentication.k8s.io false TokenReview
localsubjectaccessreviews authorization.k8s.io true LocalSubjectAccessReview
selfsubjectaccessreviews authorization.k8s.io false SelfSubjectAccessReview
selfsubjectrulesreviews authorization.k8s.io false SelfSubjectRulesReview
subjectaccessreviews authorization.k8s.io false SubjectAccessReview
horizontalpodautoscalers hpa autoscaling true HorizontalPodAutoscaler
cronjobs cj batch true CronJob
jobs batch true Job
certificatesigningrequests csr certificates.k8s.io false CertificateSigningRequest
leases coordination.k8s.io true Lease
events ev events.k8s.io true Event
daemonsets ds extensions true DaemonSet
deployments deploy extensions true Deployment
ingresses ing extensions true Ingress
networkpolicies netpol extensions true NetworkPolicy
podsecuritypolicies psp extensions false PodSecurityPolicy
replicasets rs extensions true ReplicaSet
pytorchjobs kubeflow.org true PyTorchJob
scheduledworkflows swf kubeflow.org true ScheduledWorkflow
studyjobs kubeflow.org true StudyJob
tfjobs kubeflow.org true TFJob
compositecontrollers cc,cctl metacontroller.k8s.io false CompositeController
controllerrevisions metacontroller.k8s.io true ControllerRevision
decoratorcontrollers dec,decorators metacontroller.k8s.io false DecoratorController
alertmanagers monitoring.coreos.com true Alertmanager
prometheuses monitoring.coreos.com true Prometheus
prometheusrules monitoring.coreos.com true PrometheusRule
servicemonitors monitoring.coreos.com true ServiceMonitor
networkpolicies netpol networking.k8s.io true NetworkPolicy
poddisruptionbudgets pdb policy true PodDisruptionBudget
podsecuritypolicies psp policy false PodSecurityPolicy
clusterrolebindings rbac.authorization.k8s.io false ClusterRoleBinding
clusterroles rbac.authorization.k8s.io false ClusterRole
rolebindings rbac.authorization.k8s.io true RoleBinding
roles rbac.authorization.k8s.io true Role
priorityclasses pc scheduling.k8s.io false PriorityClass
storageclasses sc storage.k8s.io false StorageClass
volumeattachments storage.k8s.io false VolumeAttachment
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
annotations:
rbac.authorization.kubernetes.io/autoupdate: "true"
labels:
kubernetes.io/bootstrapping: rbac-defaults
name: cluster-role-all-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-role-all
subjects:
- kind: ServiceAccount
name: sa-all
namespace: test-namespace
在你使用的test-namespace
命名空间中的所有的工作负载
当中都可以使用ServiceAccount。
在Pod当中使用
如果当前工作负载未绑定ServiceAccount,则会自动绑定
default
ServiceAccount。
apiVersion: v1
kind: Pod
metadata:
name: test-pod
namespace: test-namespace
spec:
serviceAccountName: sa-all
automountServiceAccountToken: false
...
另外一种方式,还可以通过文件卷挂载的方式使用
apiVersion: v1
kind: Pod
metadata:
name: test-pod
namespace: test-namespace
spec:
containers:
- image: nginx
name: nginx
volumeMounts:
- mountPath: /var/run/secrets/tokens
name: vault-token
serviceAccountName: sa-all
volumes:
- name: vault-token
projected:
sources:
- serviceAccountToken:
path: vault-token
expirationSeconds: 7200 # 过期时间
audience: vault
kubernetes会替 Pod 请求令牌并将其保存起来,通过将令牌存储到一个可配置的 路径使之在 Pod 内可用,并在令牌快要到期的时候刷新它。 kubelet 会在令牌存在期达到其 TTL 的 80% 的时候或者令牌生命期超过 24 小时 的时候主动轮换它。
感觉楼主的那个鸡蛋标签篮子的抽象很有意思,我这里做个拓展。
某一天举办了一次吃鸡蛋大赛,可以以小组的形式参加,报名结束后,一共有两个小组参加:
在他们面前各自放着三条流水线,鸡蛋从流水线滚下来(3 个 分区),厨师(生产者)在后台不停的往流水线里加鸡蛋,防止参赛选手(消费者)不够吃。
A小组正巧有三个人,就一人负责一条流水线,按照流水线的上鸡蛋的顺序吃。
B小组只有两个人,其中 B1 饭量比较大,独自负责两条流水线,这条流水线吃一个,那条流水线吃一个。
这里就对应了一个参赛选择按鸡蛋的生产顺序进行吃鸡蛋,比如A小组
,那么每个人吃的鸡蛋的顺序都是按照厨师放入流水线的顺序。而B小组
的B1
这个人,就一会儿这吃个一会儿吃那个,没有一个顺序性保证了。
比赛的结局就是谁最后吃的鸡蛋最多,谁就获胜。
结论:max.request.size=104857600
是connector内部topic的prodcuer的配置.producer.max.request.size=104857600
是connector source record 的prodcuer的配置.